Python SDK
The Python package decentriq_platform
provides all of the tools needed to interact with the Decentriq platform.
Some extra functionality, such as the ability to define SQL computations to be run in an enclave, is provided via additional submodules, also called compute modules.
The package decentriq_platform
can be used to
- create DCRs (Data Clean Rooms)
- encrypt and upload input data
- trigger computation of authorized queries
The currently available compute modules are:
decentriq_platform.sql
- for SQL-based computationsdecentriq_platform.container
- for containerized Python-based computations
To learn more, click on these modules in the Available Modules section on the right.
Under API Documentation bellow you will also find a list of all the methods and classes provided as part of each module.
Often used classes and methods are generally re-exported as part of the main namespace decentriq_platform
.
This way users of our SDK won't have to import functionality from a large set of submodules.
Note that some methods accept or return proto objects. These are objects used in the communication with our platform and due to their special structure are documented separately under the Proto section.
Each module exposes a .proto
submodule that contains all the proto objects required by that particular module.
The source code of the Decentriq Python SDK can be found on GitHub.
Installation
Requirements
Make sure you are running:
- Python 3.9 or above
- One of the following OS: macOS (≥ 10.9), Windows (any version supporting 64bit), Linux (any version supporting 64bit)
Decentriq suggests using the SDK from a clean Python environment.
Follow the instruction below to proceed with the environment creation and the installation:
Open a terminal window.
Create a new directory and navigate into it.
Create a new virtual environment to install into. You can use, for instance, Python's own
venv
package:python -m venv env source env/bin/activate
Make sure that you have the latest version of pip installed:
pip install --upgrade pip
Install the
decentriq_platform
package:pip install decentriq_platform
Test that the installation was successful. The following shouldn't print an error:
python -c "import decentriq_platform"
Available enclaves
The following enclave versions are currently available with the latest SDK version (v0.11.0):
decentriq.driver:v4
: Latest version of the driver enclave, responsible for communicating with the SDK and orchestrating computation tasks.decentriq.driver:v2
: Previous version of the driver enclave (kept for backwards compatibility).decentriq.sql-worker:v4
: Latest version of the SQL worker capable of running SQL queries.decentriq.sql-worker:v2
: Previous stable version of the SQL worker enclave (kept for backwards compatibility).decentriq.python-ml-worker:v2
: An AWS Nitro-based enclave capable of running Python (version 3.9.12) code focused on machine learning. The following libraries are available within the enclave: numpy (1.21.5), scikit-learn (1.0.2), pandas (1.4.2), statsmodels (0.13.2), xgboost (1.5.2), auto-sklearn (0.14.7).decentriq.python-synth-data-worker:v2
: Another AWS Nitro-based enclave capable of producing synthetic data based on a source of potentially sensitive input data.decentriq.r-latex-worker:v2
: An AWS Nitro enclave for running code written in R. The enclave contains a wide array of R libraries (version 4.1.2), including several for the creation of LaTeX-based reports: data_table (1.14.2), knitr (1.36), rmarkdown (2.11), tinytex (0.34), Hmisc (4.6), pROC (1.18.0), ggplot2 (3.3.5), ggmosaic (0.3.3), dplyr (1.0.7), ggforce (0.3.3), kableExtra (1.3.4).
View Source
""" .. include:: ../../decentriq_platform_docs/gcg_getting_started.md --- """ __docformat__ = "restructuredtext" from .client import Client, create_client, Session from .platform import ClientPlatformFeatures, SessionPlatformFeatures from .builders import ( DataRoomBuilder, DataRoomCommitBuilder, GovernanceProtocol ) from .compute import Noop, StaticContent from .permission import Permissions from .storage import Key from .attestation import enclave_specifications, EnclaveSpecifications __all__ = [ "create_client", "Client", "Session", "DataRoomBuilder", "DataRoomCommitBuilder", "Permissions", "GovernanceProtocol", "enclave_specifications", "EnclaveSpecifications", "Key", "StaticContent", "Noop", "sql", "container", "storage", "attestation", "types", "authentication", "platform", "session", "node", ]
View Source
def create_client( user_email: str, api_token: str, *, integrate_with_platform: bool, client_id: str = DECENTRIQ_CLIENT_ID, api_host: str = DECENTRIQ_HOST, api_port: int = DECENTRIQ_PORT, api_use_tls: bool = DECENTRIQ_USE_TLS, api_core_host: Optional[str] = DECENTRIQ_API_CORE_HOST, api_core_port: Optional[int] = DECENTRIQ_API_CORE_PORT, api_core_use_tls: Optional[bool] = DECENTRIQ_API_CORE_USE_TLS, api_platform_host: Optional[str] = DECENTRIQ_API_PLATFORM_HOST, api_platform_port: Optional[int] = DECENTRIQ_API_PLATFORM_PORT, api_platform_use_tls: Optional[bool] = DECENTRIQ_API_PLATFORM_USE_TLS, request_timeout: Optional[int] = None ) -> Client: """ The primary way to create a `Client` object. Client objects created using this method can optionally be integrated with the Decentriq UI. This means that certain additional features, such as being able to retrieve the list of data rooms that you participate in, will be made available via the `Client.platform` field. This flag also needs to be set if you want to use the authentication objects that let Decentriq act as the root CA that controls access to data rooms. In order to provide these features, the client will communicate directly with an API that exists outside of the confidential computing environment. This setting will only affect how metadata about data rooms (such as their name and id) is stored and retrieved, it will not in any way compromise the security of how your data is uploaded to the enclaves or how computed results are retrieved. **Parameters**: - `api_token`: An API token with which to authenticate oneself. The API token can be obtained in the user account settings in the Decentriq UI. - `user_email`: The email address of the user that generated the given API token. - `integrate_with_platform`: Whether to configure the client to integrate itself with our web platform. When this setting is set to False, none of the features provided via the `platform` field will be available. """ api = API( api_token, client_id, api_core_host if api_core_host is not None else api_host, api_core_port if api_core_port is not None else api_port, api_prefix="/api/core", use_tls=api_core_use_tls if api_core_use_tls is not None else api_use_tls, timeout=request_timeout ) if integrate_with_platform: platform = ClientPlatformFeatures( api_token, user_email, http_api=api, client_id=client_id, api_host=api_platform_host if api_platform_host is not None else api_host, api_port=api_platform_port if api_platform_port is not None else api_port, api_use_tls=api_platform_use_tls if api_platform_use_tls is not None else api_use_tls ) else: platform = None return Client( user_email, api, platform, request_timeout=request_timeout )
The primary way to create a Client
object.
Client objects created using this method can optionally be integrated with the
Decentriq UI.
This means that certain additional features, such as being able to retrieve the list
of data rooms that you participate in, will be made available via the Client.platform
field.
This flag also needs to be set if you want to use the authentication objects that let
Decentriq act as the root CA that controls access to data rooms.
In order to provide these features, the client will communicate directly with an API that exists outside of the confidential computing environment. This setting will only affect how metadata about data rooms (such as their name and id) is stored and retrieved, it will not in any way compromise the security of how your data is uploaded to the enclaves or how computed results are retrieved.
Parameters:
api_token
: An API token with which to authenticate oneself. The API token can be obtained in the user account settings in the Decentriq UI.user_email
: The email address of the user that generated the given API token.integrate_with_platform
: Whether to configure the client to integrate itself with our web platform. When this setting is set to False, none of the features provided via theplatform
field will be available.
View Source
class Client: """ A `Client` object allows you to upload datasets and to create `Session` objects that can communicate with enclaves and perform essential operations such as publishing data rooms and execute computations and retrieve results. Objects of this class can be used to create and run data rooms, as well as to securely upload data and retrieve computation results. Objects of this class should be created using the `create_client` function. """ _api: API _platform: Optional[ClientPlatformFeatures] def __init__( self, user_email: str, api: API, platform: Optional[ClientPlatformFeatures] = None, request_timeout: int = None ): """ Create a client instance. Rather than creating `Client` instances directly using this constructor, use the function `create_client`. """ self.user_email = user_email self._api = api self._platform = platform self.request_timeout = request_timeout def check_enclave_availability(self, specs: Dict[str, EnclaveSpecification]): """ Check whether the selected enclaves are deployed at this moment. If one of the enclaves is not deployed, an exception will be raised. """ available_specs =\ {spec["proto"].SerializeToString() for spec in self._get_enclave_specifications()} for s in specs: if s["proto"].SerializeToString() not in available_specs: raise Exception( "No available enclave deployed for attestation spec '{name}' (version {version})".\ format(name=s["name"], version=s["version"]) ) def _get_enclave_specifications(self) -> List[EnclaveSpecification]: url = Endpoints.SYSTEM_ATTESTATION_SPECS response: EnclaveSpecificationResponse = self._api.get(url).json() enclave_specs = [] for spec_json in response["attestationSpecs"]: attestation_specification = AttestationSpecification() spec_length_delimited = base64.b64decode(spec_json["spec"]) parse_length_delimited(spec_length_delimited, attestation_specification) enclave_spec = EnclaveSpecification( name=spec_json["name"], version=spec_json["version"], proto=attestation_specification, decoder=None, workerProtocols=[0], ) enclave_specs.append(enclave_spec) return enclave_specs def create_session( self, auth: Auth, enclaves: Dict[str, EnclaveSpecification], ) -> Session: """ Creates a new `decentriq_platform.session.Session` instance to communicate with a driver enclave. The passed set of enclave specifications must include a specification for a driver enclave. Messages sent through this session will be authenticated with the given authentication object. """ url = Endpoints.SESSIONS if "decentriq.driver" not in enclaves: raise Exception( "Unable to find a specification for the driver enclave" + f" named 'decentriq.driver', you can get these specifications" + " from the main package." ) driver_spec = enclaves["decentriq.driver"] if "clientProtocols" not in driver_spec: raise Exception( "Missing client supported protocol versions" ) attestation_proto = driver_spec["proto"] client_protocols = driver_spec["clientProtocols"] attestation_specification_hash =\ hashlib.sha256(serialize_length_delimited(attestation_proto)).hexdigest() req_body = CreateSessionRequest( attestationSpecificationHash=attestation_specification_hash ) response: SessionJsonResponse = self._api.post( url, json.dumps(req_body), {"Content-type": "application/json"} ).json() platform_api =\ self.platform._platform_api if self.is_integrated_with_platform else None session = Session( self, response["sessionId"], attestation_proto, client_protocols, auth=auth, platform_api=platform_api, ) return session def _create_scope(self, email: str, metadata: Dict[str, str]) -> str: url = Endpoints.USER_SCOPES_COLLECTION.replace(":userId", email) req_body = CreateScopeRequest(metadata=metadata) response: ScopeJson = self._api.post( url, json.dumps(req_body), {"Content-type": "application/json"} ).json() return response["scopeId"] def get_scope(self, email: str, scope_id: str) -> ScopeJson: url = Endpoints.USER_SCOPE \ .replace(":userId", email) \ .replace(":scopeId", scope_id) response: ScopeJson = self._api.get(url).json() return response def get_scope_by_metadata(self, email: str, metadata: Dict[str, str]) -> Optional[str]: url = Endpoints.USER_SCOPES_COLLECTION.replace(":userId", email) response: List[ScopeJson] = self._api.get( url, params={"metadata": json.dumps(metadata)} ).json() if len(response) == 0: return None else: scope = response[0] return scope["scopeId"] def _ensure_scope_with_metadata(self, email: str, metadata: Dict[str, str]) -> str: scope = self.get_scope_by_metadata(email, metadata) if scope is None: scope = self._create_scope(email, metadata) return scope def delete_scope(self, email: str, scope_id: str): url = Endpoints.USER_SCOPE \ .replace(":userId", email) \ .replace(":scopeId", scope_id) self._api.delete(url) def upload_dataset( self, data: BinaryIO, key: Key, file_name: str, /, *, description: str = "", chunk_size: int = 8 * 1024 ** 2, parallel_uploads: int = 8, owner_email: Optional[str] = None, ) -> str: """ Uploads `data` as a file usable by enclaves and returns the corresponding manifest hash. **Parameters**: - `data`: The data to upload as a buffered stream. Such an object can be obtained by wrapping a binary string in a `io.BytesIO()` object or, if reading from a file, by using `with open(path, "rb") as file`. - `key`: Encryption key used to encrypt the file. - `file_name`: Name of the file. - `description`: An optional file description. - `chunk_size`: Size of the chunks into which the stream is split in bytes. - `parallel_uploads`: Whether to upload chunks in parallel. - `owner_email`: Owner of the file if different from the one already specified when creating the client object. """ uploader = BoundedExecutor( bound=parallel_uploads * 2, max_workers=parallel_uploads ) email = owner_email if owner_email else self.user_email # create and upload chunks chunker = Chunker(data, chunk_size=chunk_size) chunk_hashes: List[str] = [] chunk_uploads_futures = [] upload_description = self._create_upload(email) for chunk_hash, chunk_data in chunker: chunk_uploads_futures.append( uploader.submit( self._encrypt_and_upload_chunk, chunk_hash, chunk_data, key.material, email, upload_description["uploadId"] ) ) chunk_hashes.append(chunk_hash.hex()) # check chunks uploads were successful completed, pending = futures.wait( chunk_uploads_futures, None, futures.FIRST_EXCEPTION ) if len(pending): # re-raise exception for future in completed: future.result() uploader.shutdown(wait=False) # create manifest and upload manifest_hash, manifest_encrypted = create_encrypted_chunk( key.material, os.urandom(16), json.dumps(chunk_hashes).encode("utf-8") ) scope_id = self._ensure_scope_with_metadata(email, {"type": ScopeTypes.USER_FILE, "manifest_hash": manifest_hash.hex() }) self._finalize_upload( user_id=email, scope_id=scope_id, upload_id=upload_description["uploadId"], name=file_name, manifest_hash=manifest_hash, manifest_encrypted=manifest_encrypted, chunks=chunk_hashes ) manifest_hash_hex = manifest_hash.hex() if self.is_integrated_with_platform: self.platform._platform_api.save_dataset_metadata( manifest_hash_hex, file_name=file_name, description=description, owner_email=self.user_email ) return manifest_hash_hex def _encrypt_and_upload_chunk( self, chunk_hash: bytes, chunk_data: bytes, key: bytes, user_id: str, upload_id: str ): cipher = StorageCipher(key) chunk_data_encrypted = cipher.encrypt(chunk_data) self._upload_chunk(chunk_hash, chunk_data_encrypted, user_id, upload_id) def _create_upload(self, user_id: str) -> UploadDescription: url = Endpoints.USER_UPLOADS_COLLECTION.replace(":userId", user_id) response = self._api.post(url, {}, {"Content-type": "application/json"}) upload_description: UploadDescription = response.json() return upload_description def _upload_chunk( self, chunk_hash: bytes, chunk_data_encrypted: bytes, user_id: str, upload_id: str ): url = Endpoints.USER_UPLOAD_CHUNKS \ .replace(":userId", user_id) \ .replace(":uploadId", upload_id) wrapped_chunk= ChunkWrapper( hash=chunk_hash.hex(), data=b64encode(chunk_data_encrypted).decode("ascii") ) self._api.post(url, json.dumps(wrapped_chunk), {"Content-type": "application/json"}) def _delete_user_upload(self, email: str, upload_id: str): url = Endpoints.USER_UPLOAD \ .replace(":userId", email) \ .replace(":uploadId", upload_id) self._api.delete(url) def _get_user_upload(self, email: str, upload_id: str) -> UploadDescription: url = Endpoints.USER_UPLOAD.replace( ":userId", email ).replace(":uploadId", upload_id) response = self._api.get(url) return response.json() def _get_user_uploads_collection(self, email: str) -> List[UploadDescription]: url = Endpoints.USER_UPLOADS_COLLECTION.replace(":userId", email) response = self._api.get(url) return response.json() def _finalize_upload( self, user_id: str, scope_id: str, upload_id: str, name: str, manifest_hash: bytes, manifest_encrypted: bytes, chunks: List[str] ) -> DatasetDescription: url = Endpoints.USER_FILES \ .replace(":userId", user_id) payload = FinalizeUpload( uploadId=upload_id, manifest=b64encode(manifest_encrypted).decode("ascii"), manifestHash=manifest_hash.hex(), name=name, chunks=chunks, scopeId=scope_id ) dataset_description: DatasetDescription = self._api.post( url, json.dumps(payload), {"Content-type": "application/json"} ).json() return dataset_description def get_dataset( self, manifest_hash: str ) -> Optional[DatasetDescription]: """ Returns information about a user file given a dataset id. """ url = Endpoints.USER_FILE \ .replace(":userId", self.user_email) \ .replace(":manifestHash", manifest_hash) try: response = self._api.get(url).json() return DatasetDescription( datasetId=response["manifestHash"], name=response["filename"], creationDate=response["creationDate"], ) except ServerError: return None def get_all_datasets(self) -> List[DatasetDescription]: """ Returns the list of files uploaded by the user. """ url = Endpoints.USER_FILES \ .replace(":userId", self.user_email) response = self._api.get(url) data = response.json() result = [] for dataset in data: result.append( DatasetDescription( datasetId=dataset["manifestHash"], name=dataset["filename"], creationDate=dataset["creationDate"], ) ) return result def delete_dataset(self, manifest_hash: str, force: bool = False): """ Deletes the dataset with the given id from the Decentriq platform. In case the dataset is still published to one or more data rooms, an exception will be thrown and the dataset will need to be unpublished manually from the respective data rooms using `Session.remove_published_dataset`. This behavior can be circumvented by using the `force` flag. Note, however, that this might put some data rooms in a broken state as they might try to read data that does not exist anymore. """ if self.is_integrated_with_platform: data_room_ids = self.platform._platform_api.\ get_data_rooms_with_published_dataset(manifest_hash) if data_room_ids: list_of_ids = "\n".join([f"- {dcr_id}" for dcr_id in data_room_ids]) if force: print( "This dataset is published to the following data rooms." " These data rooms might be in a broken state now:" f"\n{list_of_ids}" ) else: raise Exception( "This dataset is published to the following data rooms" " and needs to be unpublished before it can be deleted!" f"\n{list_of_ids}" ) url = Endpoints.USER_FILE \ .replace(":userId", self.user_email) \ .replace(":manifestHash", manifest_hash) self._api.delete(url) if self.is_integrated_with_platform: try: self.platform._platform_api.delete_dataset_metadata(manifest_hash) except Exception as e: print(f"Error when deleting dataset: {e}") @property def platform(self) -> ClientPlatformFeatures: """ Provider of a list of convenience methods to interact with the Decentriq platform. This field exposes an object that provides a set of features known from the Decentriq web platform. """ if self._platform: return self._platform else: raise Exception( "This field is not set as the client has not been configured with integration" " with the web platform." ) @property def is_integrated_with_platform(self) -> bool: """Whether this client has been created with platform integration.""" return self._platform is not None
A Client
object allows you to upload datasets and to create Session
objects that
can communicate with enclaves and perform essential operations such as publishing
data rooms and execute computations and retrieve results.
Objects of this class can be used to create and run data rooms, as well as to securely upload data and retrieve computation results.
Objects of this class should be created using the create_client
function.
View Source
def __init__( self, user_email: str, api: API, platform: Optional[ClientPlatformFeatures] = None, request_timeout: int = None ): """ Create a client instance. Rather than creating `Client` instances directly using this constructor, use the function `create_client`. """ self.user_email = user_email self._api = api self._platform = platform self.request_timeout = request_timeout
Create a client instance.
Rather than creating Client
instances directly using this constructor, use the function
create_client
.
View Source
def check_enclave_availability(self, specs: Dict[str, EnclaveSpecification]): """ Check whether the selected enclaves are deployed at this moment. If one of the enclaves is not deployed, an exception will be raised. """ available_specs =\ {spec["proto"].SerializeToString() for spec in self._get_enclave_specifications()} for s in specs: if s["proto"].SerializeToString() not in available_specs: raise Exception( "No available enclave deployed for attestation spec '{name}' (version {version})".\ format(name=s["name"], version=s["version"]) )
Check whether the selected enclaves are deployed at this moment. If one of the enclaves is not deployed, an exception will be raised.
View Source
def create_session( self, auth: Auth, enclaves: Dict[str, EnclaveSpecification], ) -> Session: """ Creates a new `decentriq_platform.session.Session` instance to communicate with a driver enclave. The passed set of enclave specifications must include a specification for a driver enclave. Messages sent through this session will be authenticated with the given authentication object. """ url = Endpoints.SESSIONS if "decentriq.driver" not in enclaves: raise Exception( "Unable to find a specification for the driver enclave" + f" named 'decentriq.driver', you can get these specifications" + " from the main package." ) driver_spec = enclaves["decentriq.driver"] if "clientProtocols" not in driver_spec: raise Exception( "Missing client supported protocol versions" ) attestation_proto = driver_spec["proto"] client_protocols = driver_spec["clientProtocols"] attestation_specification_hash =\ hashlib.sha256(serialize_length_delimited(attestation_proto)).hexdigest() req_body = CreateSessionRequest( attestationSpecificationHash=attestation_specification_hash ) response: SessionJsonResponse = self._api.post( url, json.dumps(req_body), {"Content-type": "application/json"} ).json() platform_api =\ self.platform._platform_api if self.is_integrated_with_platform else None session = Session( self, response["sessionId"], attestation_proto, client_protocols, auth=auth, platform_api=platform_api, ) return session
Creates a new decentriq_platform.session.Session
instance to communicate
with a driver enclave.
The passed set of enclave specifications must include a specification for
a driver enclave.
Messages sent through this session will be authenticated with the given authentication object.
View Source
def get_scope(self, email: str, scope_id: str) -> ScopeJson: url = Endpoints.USER_SCOPE \ .replace(":userId", email) \ .replace(":scopeId", scope_id) response: ScopeJson = self._api.get(url).json() return response
View Source
def get_scope_by_metadata(self, email: str, metadata: Dict[str, str]) -> Optional[str]: url = Endpoints.USER_SCOPES_COLLECTION.replace(":userId", email) response: List[ScopeJson] = self._api.get( url, params={"metadata": json.dumps(metadata)} ).json() if len(response) == 0: return None else: scope = response[0] return scope["scopeId"]
View Source
def delete_scope(self, email: str, scope_id: str): url = Endpoints.USER_SCOPE \ .replace(":userId", email) \ .replace(":scopeId", scope_id) self._api.delete(url)
View Source
def upload_dataset( self, data: BinaryIO, key: Key, file_name: str, /, *, description: str = "", chunk_size: int = 8 * 1024 ** 2, parallel_uploads: int = 8, owner_email: Optional[str] = None, ) -> str: """ Uploads `data` as a file usable by enclaves and returns the corresponding manifest hash. **Parameters**: - `data`: The data to upload as a buffered stream. Such an object can be obtained by wrapping a binary string in a `io.BytesIO()` object or, if reading from a file, by using `with open(path, "rb") as file`. - `key`: Encryption key used to encrypt the file. - `file_name`: Name of the file. - `description`: An optional file description. - `chunk_size`: Size of the chunks into which the stream is split in bytes. - `parallel_uploads`: Whether to upload chunks in parallel. - `owner_email`: Owner of the file if different from the one already specified when creating the client object. """ uploader = BoundedExecutor( bound=parallel_uploads * 2, max_workers=parallel_uploads ) email = owner_email if owner_email else self.user_email # create and upload chunks chunker = Chunker(data, chunk_size=chunk_size) chunk_hashes: List[str] = [] chunk_uploads_futures = [] upload_description = self._create_upload(email) for chunk_hash, chunk_data in chunker: chunk_uploads_futures.append( uploader.submit( self._encrypt_and_upload_chunk, chunk_hash, chunk_data, key.material, email, upload_description["uploadId"] ) ) chunk_hashes.append(chunk_hash.hex()) # check chunks uploads were successful completed, pending = futures.wait( chunk_uploads_futures, None, futures.FIRST_EXCEPTION ) if len(pending): # re-raise exception for future in completed: future.result() uploader.shutdown(wait=False) # create manifest and upload manifest_hash, manifest_encrypted = create_encrypted_chunk( key.material, os.urandom(16), json.dumps(chunk_hashes).encode("utf-8") ) scope_id = self._ensure_scope_with_metadata(email, {"type": ScopeTypes.USER_FILE, "manifest_hash": manifest_hash.hex() }) self._finalize_upload( user_id=email, scope_id=scope_id, upload_id=upload_description["uploadId"], name=file_name, manifest_hash=manifest_hash, manifest_encrypted=manifest_encrypted, chunks=chunk_hashes ) manifest_hash_hex = manifest_hash.hex() if self.is_integrated_with_platform: self.platform._platform_api.save_dataset_metadata( manifest_hash_hex, file_name=file_name, description=description, owner_email=self.user_email ) return manifest_hash_hex
Uploads data
as a file usable by enclaves and returns the
corresponding manifest hash.
Parameters:
data
: The data to upload as a buffered stream. Such an object can be obtained by wrapping a binary string in aio.BytesIO()
object or, if reading from a file, by usingwith open(path, "rb") as file
.key
: Encryption key used to encrypt the file.file_name
: Name of the file.description
: An optional file description.chunk_size
: Size of the chunks into which the stream is split in bytes.parallel_uploads
: Whether to upload chunks in parallel.owner_email
: Owner of the file if different from the one already specified when creating the client object.
View Source
def get_dataset( self, manifest_hash: str ) -> Optional[DatasetDescription]: """ Returns information about a user file given a dataset id. """ url = Endpoints.USER_FILE \ .replace(":userId", self.user_email) \ .replace(":manifestHash", manifest_hash) try: response = self._api.get(url).json() return DatasetDescription( datasetId=response["manifestHash"], name=response["filename"], creationDate=response["creationDate"], ) except ServerError: return None
Returns information about a user file given a dataset id.
View Source
def get_all_datasets(self) -> List[DatasetDescription]: """ Returns the list of files uploaded by the user. """ url = Endpoints.USER_FILES \ .replace(":userId", self.user_email) response = self._api.get(url) data = response.json() result = [] for dataset in data: result.append( DatasetDescription( datasetId=dataset["manifestHash"], name=dataset["filename"], creationDate=dataset["creationDate"], ) ) return result
Returns the list of files uploaded by the user.
View Source
def delete_dataset(self, manifest_hash: str, force: bool = False): """ Deletes the dataset with the given id from the Decentriq platform. In case the dataset is still published to one or more data rooms, an exception will be thrown and the dataset will need to be unpublished manually from the respective data rooms using `Session.remove_published_dataset`. This behavior can be circumvented by using the `force` flag. Note, however, that this might put some data rooms in a broken state as they might try to read data that does not exist anymore. """ if self.is_integrated_with_platform: data_room_ids = self.platform._platform_api.\ get_data_rooms_with_published_dataset(manifest_hash) if data_room_ids: list_of_ids = "\n".join([f"- {dcr_id}" for dcr_id in data_room_ids]) if force: print( "This dataset is published to the following data rooms." " These data rooms might be in a broken state now:" f"\n{list_of_ids}" ) else: raise Exception( "This dataset is published to the following data rooms" " and needs to be unpublished before it can be deleted!" f"\n{list_of_ids}" ) url = Endpoints.USER_FILE \ .replace(":userId", self.user_email) \ .replace(":manifestHash", manifest_hash) self._api.delete(url) if self.is_integrated_with_platform: try: self.platform._platform_api.delete_dataset_metadata(manifest_hash) except Exception as e: print(f"Error when deleting dataset: {e}")
Deletes the dataset with the given id from the Decentriq platform.
In case the dataset is still published to one or more data rooms,
an exception will be thrown and the dataset will need to be
unpublished manually from the respective data rooms using
Session.remove_published_dataset
.
This behavior can be circumvented by using the force
flag.
Note, however, that this might put some data rooms in a broken
state as they might try to read data that does not exist anymore.
Provider of a list of convenience methods to interact with the Decentriq platform.
This field exposes an object that provides a set of features known from the Decentriq web platform.
Whether this client has been created with platform integration.
View Source
class Session(): """ Class for managing the communication with an enclave. """ client: Client session_id: str enclave_identifier: str auth: Auth email: str keypair: Any fatquote: Fatquote quote: QuoteBody driver_attestation_specification: AttestationSpecification client_protocols: List[int] def __init__( self, client: Client, session_id: str, driver_attestation_specification: AttestationSpecification, client_protocols: List[int], auth: Auth, platform_api: Optional[PlatformApi] = None, ): """ `Session` instances should not be instantiated directly but rather be created using a `Client` object using `decentriq_platform.Client.create_session`. """ url = Endpoints.SESSION_FATQUOTE.replace(":sessionId", session_id) response: FatquoteResBody = client._api.get(url).json() fatquote_bytes = b64decode(response["fatquoteBase64"]) fatquote = Fatquote() fatquote.ParseFromString(fatquote_bytes) verification = Verification(attestation_specification=driver_attestation_specification) report_data = verification.verify(fatquote) self.client = client self.session_id = session_id self.auth = auth self.email = auth.user_id self.keypair = chily.Keypair.from_random() self.fatquote = fatquote self.report_data = report_data self.driver_attestation_specification = driver_attestation_specification self.client_protocols = client_protocols if platform_api: self._platform = SessionPlatformFeatures(self, platform_api) else: self._platform = None def _get_client_protocol(self, endpoint_protocols: List[int]) -> int: try: protocol = max(set(self.client_protocols) & set(endpoint_protocols)) return protocol except ValueError: min_enclave_version = min(self.client_protocols) max_endpoint_version = min(endpoint_protocols) exception_message = "Endpoint is only available with protocol versions {} but the enclave only supports {}\n".format( endpoint_protocols, self.client_protocols ) if min_enclave_version > max_endpoint_version: exception_message += "Try upgrading to a newer version of the SDK" else: exception_message += "Try using an older version of the SDK" raise Exception(exception_message) def _get_enclave_pubkey(self): pub_keyB = bytearray(self.report_data[:32]) return chily.PublicKey.from_bytes(pub_keyB) def _encrypt_and_encode_data(self, data: bytes, auth: Auth) -> bytes: nonce = chily.Nonce.from_random() cipher = chily.Cipher( self.keypair.secret, self._get_enclave_pubkey() ) enc_data = cipher.encrypt("client sent session data", data, nonce) public_keys = bytes(self.keypair.public_key.bytes) + bytes(self._get_enclave_pubkey().bytes) signature = auth._sign(public_keys) shared_key = bytes(self.keypair.secret.diffie_hellman(self._get_enclave_pubkey()).bytes) hkdf = HKDF(algorithm=hashes.SHA512(), length=64, info=b"IdP KDF Context", salt=b"") mac_key = hkdf.derive(shared_key) mac_tag = hmac.digest(mac_key, auth._get_user_id().encode(), "sha512") sigma_auth = Sigma(signature, mac_tag, auth) return datanoncepubkey_to_message( bytes(enc_data), bytes(nonce.bytes), bytes(self.keypair.public_key.bytes), sigma_auth ) def _decode_and_decrypt_data(self, data: bytes) -> bytes: dec_data, nonceB, _ = message_to_datanoncepubkey(data) cipher = chily.Cipher( self.keypair.secret, self._get_enclave_pubkey() ) return cipher.decrypt("client received session data", dec_data, chily.Nonce.from_bytes(nonceB)) def send_request( self, request: GcgRequest, protocol: int, ) -> List[GcgResponse]: """ Low-level method for sending a raw `GcgRequest` to the enclave. Use this method if any of the convenience methods (such as `run_computation`) don't perform the exact task you want. """ gcg_protocol = serialize_length_delimited( ComputeNodeProtocol( version=protocol ) ) serialized_request = serialize_length_delimited( Request( deltaRequest= self._encrypt_and_encode_data( gcg_protocol + serialize_length_delimited(request), self.auth ) ) ) url = Endpoints.SESSION_MESSAGES.replace(":sessionId", self.session_id) enclave_request = EnclaveMessage(data=b64encode(serialized_request).decode("ascii")) enclave_response: bytes = self.client._api.post( url, json.dumps(enclave_request), {"Content-type": "application/json", "Accept-Version": "2"} ).content responses: List[GcgResponse] = [] offset = 0 while offset < len(enclave_response): response_container = Response() offset += parse_length_delimited(enclave_response[offset:], response_container) if response_container.HasField("unsuccessfulResponse"): raise Exception(response_container.unsuccessfulResponse) else: response = GcgResponse() decrypted_response = self._decode_and_decrypt_data( response_container.successfulResponse ) response_protocol = ComputeNodeProtocol() response_offset = parse_length_delimited( decrypted_response, response_protocol ) if response_protocol.version != protocol: raise Exception("Different response protocol version than requested") parse_length_delimited(decrypted_response[response_offset:], response) if response.HasField("failure"): raise Exception(response.failure) responses.append(response) return responses def _publish_data_room( self, data_room_definition: Tuple[DataRoom, List[ConfigurationModification]], ) -> CreateDataRoomResponse: endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) data_room, conf_modifications = data_room_definition if not data_room.ownerEmail: data_room.ownerEmail = self.email request = CreateDataRoomRequest( dataRoom=data_room, initialConfiguration=conf_modifications, ) responses = self.send_request(GcgRequest(createDataRoomRequest=request), protocol) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if response.HasField("createDataRoomResponse"): if response.createDataRoomResponse.HasField("dataRoomValidationError"): raise Exception( "Error when validating data room: {} (compute node id '{}')".format( response.createDataRoomResponse.dataRoomValidationError.message, response.createDataRoomResponse.dataRoomValidationError.computeNodeId ) ) else: if self.is_integrated_with_platform: data_room_hash = response.createDataRoomResponse.dataRoomId self._create_data_room_in_platform(data_room_definition, data_room_hash) else: raise Exception( "Expected createDataRoomResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.createDataRoomResponse def publish_data_room( self, data_room_definition: Tuple[DataRoom, List[ConfigurationModification]] ) -> str: """ Create a data room with the provided protobuf configuration object and have the enclave apply the given list of modifications to the data room configuration. The id returned from this method will be used when interacting with the published data room (for example when running computations or publishing datasets). """ response = self._publish_data_room(data_room_definition) return _get_data_room_id(response).hex() def publish_data_room_configuration_commit( self, configuration_commit: ConfigurationCommit ) -> str: """ Publish the given data room configuration commit. Configuration commits can be built using a `DataRoomCommitBuilder` object. The id returned from this method will be used when running development computations or when trying to merge this commit into the main data room configuration. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": configuration_commit.dataRoomId.hex(), } ) request = CreateConfigurationCommitRequest( commit=configuration_commit, scope=bytes.fromhex(scope_id) ) responses = self.send_request( GcgRequest(createConfigurationCommitRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("createConfigurationCommitResponse"): raise Exception( "Expected createConfigurationCommitResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.createConfigurationCommitResponse.commitId.hex() def retrieve_configuration_commit( self, configuration_commit_id: str, ) -> ConfigurationCommit: """ Retrieve the content of given configuration commit id. **Returns**: A `ConfigurationCommit`. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = RetrieveConfigurationCommitRequest( commitId=bytes.fromhex(configuration_commit_id), scope=bytes.fromhex(scope_id), ) responses = self.send_request( GcgRequest(retrieveConfigurationCommitRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveConfigurationCommitResponse"): raise Exception( "Expected retrieveConfigurationCommitResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveConfigurationCommitResponse.commit def retrieve_configuration_commit_approvers( self, configuration_commit_id: str, ) -> List[str]: """ Retrieve the list of users who need to approve the merger of a given configuration commit. **Returns**: A list of ids belonging to the users that need to approve the configuration commit. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = RetrieveConfigurationCommitApproversRequest( commitId=bytes.fromhex(configuration_commit_id), scope=bytes.fromhex(scope_id), ) responses = self.send_request( GcgRequest(retrieveConfigurationCommitApproversRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveConfigurationCommitApproversResponse"): raise Exception( "Expected retrieveConfigurationCommitApproversResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveConfigurationCommitApproversResponse.approvers def generate_merge_approval_signature( self, configuration_commit_id: str ) -> bytes: """ Generate an approval signature required for merging a configuration commit. To merge a specific configuration commit, each user referenced in the list of ids returned by `retrieveConfigurationCommitApprovers` needs to generate an approval signature using this method. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = GenerateMergeApprovalSignatureRequest( commitId=bytes.fromhex(configuration_commit_id), scope=bytes.fromhex(scope_id), ) responses = self.send_request( GcgRequest(generateMergeApprovalSignatureRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("generateMergeApprovalSignatureResponse"): raise Exception( "Expected generateMergeApprovalSignatureResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.generateMergeApprovalSignatureResponse.signature def merge_configuration_commit( self, configuration_commit_id: str, approval_signatures: Dict[str, bytes], ) -> MergeConfigurationCommitResponse: """ Request the enclave to merge the given configuration commit into the main data room configuration. **Parameters**: - `configuration_commit_id`: The id of the commit to be merged. - `approval_signatures`: A dictionary containing the approval signature for each of the required approvers, e.g. `{ "some@email.com": signature }`. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = MergeConfigurationCommitRequest( scope=bytes.fromhex(scope_id), commitId=bytes.fromhex(configuration_commit_id), approvalSignatures=approval_signatures, ) responses = self.send_request( GcgRequest(mergeConfigurationCommitRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("mergeConfigurationCommitResponse"): raise Exception( "Expected mergeConfigurationCommitResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.mergeConfigurationCommitResponse def retrieve_data_room_configuration_history( self, data_room_id: str ) -> RetrieveDataRoomConfigurationHistoryResponse: """ Retrieve the current merged data room configuration, as well as the history of configuration commits that have already been merged. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) request = RetrieveDataRoomConfigurationHistoryRequest( dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveDataRoomConfigurationHistoryRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveDataRoomConfigurationHistoryResponse"): raise Exception( "Expected retrieveDataRoomConfigurationHistoryResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveDataRoomConfigurationHistoryResponse def retrieve_current_data_room_configuration( self, data_room_id: str ) -> Tuple[DataRoomConfiguration, str]: """ Retrieve the current data room confguration, as well as the current "history pin". A history pin is the hash of all the ids of configuration commits that make up the structure of a data room. This pin therefore uniquely identifies a data room's structure at a certain point in time. A data room configuration, as well as its associated history pin, can be used to extend an existing data room (for example by adding new compute nodes). Extending an existing data room is done using the `DataRoomCommitBuilder` class. """ response = self.retrieve_data_room_configuration_history(data_room_id) return (response.currentConfiguration, response.pin.hex()) def stop_data_room(self, data_room_id: str): """ Stop the data room with the given id, making it impossible to run new computations. """ self._update_data_room_status(data_room_id, DataRoomStatus.Value("Stopped")) def _update_data_room_status( self, data_room_id: str, status # type: DataRoomStatus.V ) -> UpdateDataRoomStatusResponse: """ Update the status of the data room. For the special case of stopping a data room, the method `stop_data_room` can be used. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = UpdateDataRoomStatusRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), status=status, ) responses = self.send_request( GcgRequest(updateDataRoomStatusRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if response.HasField("updateDataRoomStatusResponse"): if self.is_integrated_with_platform: self.platform._platform_api.update_data_room_status(data_room_id, status) else: raise Exception( "Expected updateDataRoomStatusResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.updateDataRoomStatusResponse def retrieve_data_room_status( self, data_room_id: str ) -> str: """ Returns the status of the data room. Valid values are `"Active"` or `"Stopped"`. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RetrieveDataRoomStatusRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveDataRoomStatusRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveDataRoomStatusResponse"): raise Exception( "Expected retrieveDataRoomStatusResponse, got " + str(response.WhichOneof("gcg_response")) ) return DataRoomStatus.Name(response.retrieveDataRoomStatusResponse.status) def retrieve_data_room_definition( self, data_room_id: str ) -> RetrieveDataRoomResponse: """ Returns the underlying protobuf object for the data room. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) request = RetrieveDataRoomRequest( dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveDataRoomRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveDataRoomResponse"): raise Exception( "Expected retrieveDataRoomResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveDataRoomResponse def retrieve_audit_log( self, data_room_id: str ) -> RetrieveAuditLogResponse: """ Returns the audit log for the data room. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RetrieveAuditLogRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveAuditLogRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveAuditLogResponse"): raise Exception( "Expected retrieveAuditLogResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveAuditLogResponse def publish_dataset( self, data_room_id: str, manifest_hash: str, leaf_id: str, key: Key, *, force: bool = False ) -> PublishDatasetToDataRoomResponse: """ Publishes a file and its encryption key to a data room. Neither the file or the encryption key will ever be stored in unencrypted form. This method will check whether the to-be-published file exists. If this is not the case, an exception will be raised. This behavior can be disabled by setting the `force` flag. In case the original client was created with platform integration enabled, the method will further check whether there already is a dataset published for the given data room. In this case, an exception will be thrown and the dataset will need to be unpublished first. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) dataset = self.client.get_dataset(manifest_hash) if not dataset and not force: raise Exception( "The dataset you are trying to publish does not exist" ) should_create_dataset_links =\ self.is_integrated_with_platform and \ self._is_web_data_room(data_room_id) if should_create_dataset_links: existing_link = self.platform._platform_api.get_dataset_link( data_room_id, self._convert_data_node_name_to_verifier_node(leaf_id) ) if existing_link: existing_dataset = existing_link["dataset"]["datasetId"] raise Exception( "The following dataset has already been published for this node." + " Please unpublish this dataset first." + f" Dataset: '{existing_dataset}'" ) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = PublishDatasetToDataRoomRequest( dataRoomId=bytes.fromhex(data_room_id), datasetHash=bytes.fromhex(manifest_hash), leafName=leaf_id, encryptionKey=key.material, scope=bytes.fromhex(scope_id) ) responses = self.send_request( GcgRequest(publishDatasetToDataRoomRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("publishDatasetToDataRoomResponse"): raise Exception( "Expected publishDatasetToDataRoomResponse, got " + str(response.WhichOneof("gcg_response")) ) else: if should_create_dataset_links: self.platform._platform_api.create_dataset_link( data_room_id, manifest_hash, self._convert_data_node_name_to_verifier_node(leaf_id) ) return response.publishDatasetToDataRoomResponse def _convert_data_node_name_to_verifier_node(self, node_name: str) -> str: pattern = re.compile(r"@table/(.*)/dataset") match = pattern.match(node_name) if match: return match.group(1) else: return node_name def remove_published_dataset( self, data_room_id: str, leaf_id: str, ) -> RemovePublishedDatasetResponse: """ Removes a published dataset from the data room. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RemovePublishedDatasetRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), leafName=leaf_id, ) responses = self.send_request( GcgRequest(removePublishedDatasetRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("removePublishedDatasetResponse"): raise Exception( "Expected removePublishedDatasetResponse, got " + str(response.WhichOneof("gcg_response")) ) else: if self.is_integrated_with_platform and self._is_web_data_room(data_room_id): try: self.platform._platform_api.delete_dataset_link( data_room_id, self._convert_data_node_name_to_verifier_node(leaf_id) ) except Exception as error: print(f"Error when deleting dataset link on platform: {error}") return response.removePublishedDatasetResponse def retrieve_published_datasets( self, data_room_id: str, ) -> RetrievePublishedDatasetsResponse: """ Returns the datasets published to the given data room. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RetrievePublishedDatasetsRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrievePublishedDatasetsRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrievePublishedDatasetsResponse"): raise Exception( "Expected retrievePublishedDatasetResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrievePublishedDatasetsResponse def _submit_dev_compute( self, configuration_commit_id: str, compute_node_ids: List[str], /, *, dry_run: bool = False, ) -> ExecuteComputeResponse: """ Submits a computation request which will generate an execution plan to perform the computation of the goal nodes """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = ExecuteDevelopmentComputeRequest( configurationCommitId=bytes.fromhex(configuration_commit_id), computeNodeNames=compute_node_ids, isDryRun=dry_run, scope=bytes.fromhex(scope_id) ) responses = self.send_request( GcgRequest(executeDevelopmentComputeRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("executeComputeResponse"): raise Exception( "Expected executeComputeResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.executeComputeResponse def _submit_compute( self, data_room_id: str, compute_node_ids: List[str], /, *, dry_run: bool = False, ) -> ExecuteComputeResponse: """ Submits a computation request which will generate an execution plan to perform the computation of the goal nodes """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = ExecuteComputeRequest( dataRoomId=bytes.fromhex(data_room_id), computeNodeNames=compute_node_ids, isDryRun=dry_run, scope=bytes.fromhex(scope_id) ) responses = self.send_request( GcgRequest(executeComputeRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("executeComputeResponse"): raise Exception( "Expected executeComputeResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.executeComputeResponse def get_computation_status(self, job_id: str) -> JobStatusResponse: """ Returns the status of the provided `job_id` which will include the names of the nodes that completed their execution """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) request = JobStatusRequest( jobId=bytes.fromhex(job_id), ) responses = self.send_request( GcgRequest(jobStatusRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("jobStatusResponse"): raise Exception( "Expected jobStatusResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.jobStatusResponse def _stream_job_results( self, job_id: bytes, compute_node_id: str, ) -> Iterator[GetResultsResponseChunk]: """ Streams the results of the provided `job_id` """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) request = GetResultsRequest( jobId=job_id, computeNodeName=compute_node_id, ) responses = self.send_request( GcgRequest(getResultsRequest=request), protocol ) for response in responses: if response.HasField("getResultsResponseChunk"): yield response.getResultsResponseChunk elif response.HasField("getResultsResponseFooter"): return else: raise Exception( "Expected getResultsResponseChunk or getResultsResponseFooter, got " + str(response.WhichOneof("gcg_response")) ) raise Exception("Enclave connection aborted while streaming results") def _get_job_results( self, job_id: bytes, compute_node_id: str, ) -> bytes: """ Returns the results of the provided `job_id` """ return b"".join(list(map(lambda chunk: chunk.data, self._stream_job_results(job_id, compute_node_id)))) def run_computation( self, data_room_id: str, compute_node_id: str, /, *, dry_run: bool = False, ) -> JobId: """ Run a specific computation within the data room with the given id. The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results. """ response = self._submit_compute(data_room_id, [compute_node_id], dry_run=dry_run) return JobId(response.jobId.hex(), compute_node_id) def wait_until_computation_has_finished( self, job_id: JobId, /, *, interval: int = 5, timeout: int = None ): """ Wait for the given job to complete. The method will check for the job's completeness every `interval` seconds and up to an optional `timeout` seconds after which the method will raise an exception. """ elapsed = 0 while True: if timeout is not None and elapsed > timeout: raise Exception( f"Timeout when trying to get result for job {job_id.id} of" f" {job_id.compute_node_id} (waited {timeout} seconds)" ) elif job_id.compute_node_id in self.get_computation_status(job_id.id).completeComputeNodeNames: break else: sleep(interval) elapsed += interval def run_dev_computation( self, configuration_commit_id: str, compute_node_id: str, /, *, dry_run: bool = False, ) -> JobId: """ Run a specific computation within the context of the data room configuration defined by the given commit id. Such "development" computations can also be run for configuration commits that have not yet been merged. The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results. """ response = self._submit_dev_compute(configuration_commit_id, [compute_node_id], dry_run=dry_run) return JobId(response.jobId.hex(), compute_node_id) def get_computation_result( self, job_id: JobId, /, *, interval: int = 5, timeout: int = None ) -> bytes: """ Wait for the given job to complete and retrieve its results as a raw byte string. The method will check for the job's completeness every `interval` seconds and up to an optional `timeout` seconds after which the method will raise an exception. If the job completes and the results can be retrieved successfully, a raw byte string will be returned. The bytes tring can be transformed into a more useful object using a variety of helper methods. These helper methods are specific for the type of computation you ran and can be found in the corresponding packages. """ elapsed = 0 job_id_bytes = bytes.fromhex(job_id.id) self.wait_until_computation_has_finished( job_id, interval=interval, timeout=timeout ) results = self._get_job_results(job_id_bytes, job_id.compute_node_id) return results def run_computation_and_get_results( self, data_room_id: str, compute_node_id: str, /, *, interval: int = 5, timeout: int = None ) -> Optional[bytes]: """ Run a specific computation and return its results. This method is simply a wrapper for running `run_computation` and `get_computation_result` directly after each other """ job_id = self.run_computation(data_room_id, compute_node_id) return self.get_computation_result( job_id, interval=interval, timeout=timeout ) def _create_data_room_in_platform( self, data_room_definition: DataRoom, data_room_hash: bytes ): attestation_spec_type =\ self.driver_attestation_specification.WhichOneof("attestation_specification") if attestation_spec_type == "intelEpid": mrenclave = self.driver_attestation_specification.intelEpid.mrenclave elif attestation_spec_type == "intelDcap": mrenclave = self.driver_attestation_specification.intelDcap.mrenclave else: raise Exception("Unknown attestation specification type") attestation_specification_hash = hashlib.sha256( serialize_length_delimited( self.driver_attestation_specification ) ).hexdigest() self.platform._platform_api.publish_data_room( data_room_definition, data_room_hash=data_room_hash.hex(), attestation_specification_hash=attestation_specification_hash ) def _is_web_data_room(self, data_room_id: str) -> bool: data_room = self.platform._platform_api.get_data_room_by_hash(data_room_id) if data_room: return data_room["source"] == "WEB" else: raise Exception(f"Unable to find data room with id '{data_room_id}'") @property def platform(self) -> SessionPlatformFeatures: """ Provider of a list of convenience methods to interact with the Decentriq platform. This field exposes an object that provides a set of convenience features known from the Decentriq web platform. These include, for example, getting the list of data sets that are also visible in the browser-based platform UI. """ if self._platform: return self._platform else: raise Exception( "This field is not set as the client from wich this session has" " been derived has not been configured with integration to the web" " platform." ) @property def is_integrated_with_platform(self): return self._platform is not None
Class for managing the communication with an enclave.
View Source
def __init__( self, client: Client, session_id: str, driver_attestation_specification: AttestationSpecification, client_protocols: List[int], auth: Auth, platform_api: Optional[PlatformApi] = None, ): """ `Session` instances should not be instantiated directly but rather be created using a `Client` object using `decentriq_platform.Client.create_session`. """ url = Endpoints.SESSION_FATQUOTE.replace(":sessionId", session_id) response: FatquoteResBody = client._api.get(url).json() fatquote_bytes = b64decode(response["fatquoteBase64"]) fatquote = Fatquote() fatquote.ParseFromString(fatquote_bytes) verification = Verification(attestation_specification=driver_attestation_specification) report_data = verification.verify(fatquote) self.client = client self.session_id = session_id self.auth = auth self.email = auth.user_id self.keypair = chily.Keypair.from_random() self.fatquote = fatquote self.report_data = report_data self.driver_attestation_specification = driver_attestation_specification self.client_protocols = client_protocols if platform_api: self._platform = SessionPlatformFeatures(self, platform_api) else: self._platform = None
Session
instances should not be instantiated directly but rather
be created using a Client
object using decentriq_platform.Client.create_session
.
View Source
def send_request( self, request: GcgRequest, protocol: int, ) -> List[GcgResponse]: """ Low-level method for sending a raw `GcgRequest` to the enclave. Use this method if any of the convenience methods (such as `run_computation`) don't perform the exact task you want. """ gcg_protocol = serialize_length_delimited( ComputeNodeProtocol( version=protocol ) ) serialized_request = serialize_length_delimited( Request( deltaRequest= self._encrypt_and_encode_data( gcg_protocol + serialize_length_delimited(request), self.auth ) ) ) url = Endpoints.SESSION_MESSAGES.replace(":sessionId", self.session_id) enclave_request = EnclaveMessage(data=b64encode(serialized_request).decode("ascii")) enclave_response: bytes = self.client._api.post( url, json.dumps(enclave_request), {"Content-type": "application/json", "Accept-Version": "2"} ).content responses: List[GcgResponse] = [] offset = 0 while offset < len(enclave_response): response_container = Response() offset += parse_length_delimited(enclave_response[offset:], response_container) if response_container.HasField("unsuccessfulResponse"): raise Exception(response_container.unsuccessfulResponse) else: response = GcgResponse() decrypted_response = self._decode_and_decrypt_data( response_container.successfulResponse ) response_protocol = ComputeNodeProtocol() response_offset = parse_length_delimited( decrypted_response, response_protocol ) if response_protocol.version != protocol: raise Exception("Different response protocol version than requested") parse_length_delimited(decrypted_response[response_offset:], response) if response.HasField("failure"): raise Exception(response.failure) responses.append(response) return responses
Low-level method for sending a raw GcgRequest
to the enclave.
Use this method if any of the convenience methods (such as run_computation
) don't perform
the exact task you want.
View Source
def publish_data_room( self, data_room_definition: Tuple[DataRoom, List[ConfigurationModification]] ) -> str: """ Create a data room with the provided protobuf configuration object and have the enclave apply the given list of modifications to the data room configuration. The id returned from this method will be used when interacting with the published data room (for example when running computations or publishing datasets). """ response = self._publish_data_room(data_room_definition) return _get_data_room_id(response).hex()
Create a data room with the provided protobuf configuration object and have the enclave apply the given list of modifications to the data room configuration.
The id returned from this method will be used when interacting with the published data room (for example when running computations or publishing datasets).
View Source
def publish_data_room_configuration_commit( self, configuration_commit: ConfigurationCommit ) -> str: """ Publish the given data room configuration commit. Configuration commits can be built using a `DataRoomCommitBuilder` object. The id returned from this method will be used when running development computations or when trying to merge this commit into the main data room configuration. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": configuration_commit.dataRoomId.hex(), } ) request = CreateConfigurationCommitRequest( commit=configuration_commit, scope=bytes.fromhex(scope_id) ) responses = self.send_request( GcgRequest(createConfigurationCommitRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("createConfigurationCommitResponse"): raise Exception( "Expected createConfigurationCommitResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.createConfigurationCommitResponse.commitId.hex()
Publish the given data room configuration commit.
Configuration commits can be built using a DataRoomCommitBuilder
object.
The id returned from this method will be used when running development computations or when trying to merge this commit into the main data room configuration.
View Source
def retrieve_configuration_commit( self, configuration_commit_id: str, ) -> ConfigurationCommit: """ Retrieve the content of given configuration commit id. **Returns**: A `ConfigurationCommit`. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = RetrieveConfigurationCommitRequest( commitId=bytes.fromhex(configuration_commit_id), scope=bytes.fromhex(scope_id), ) responses = self.send_request( GcgRequest(retrieveConfigurationCommitRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveConfigurationCommitResponse"): raise Exception( "Expected retrieveConfigurationCommitResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveConfigurationCommitResponse.commit
Retrieve the content of given configuration commit id.
Returns:
A ConfigurationCommit
.
View Source
def retrieve_configuration_commit_approvers( self, configuration_commit_id: str, ) -> List[str]: """ Retrieve the list of users who need to approve the merger of a given configuration commit. **Returns**: A list of ids belonging to the users that need to approve the configuration commit. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = RetrieveConfigurationCommitApproversRequest( commitId=bytes.fromhex(configuration_commit_id), scope=bytes.fromhex(scope_id), ) responses = self.send_request( GcgRequest(retrieveConfigurationCommitApproversRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveConfigurationCommitApproversResponse"): raise Exception( "Expected retrieveConfigurationCommitApproversResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveConfigurationCommitApproversResponse.approvers
Retrieve the list of users who need to approve the merger of a given configuration commit.
Returns: A list of ids belonging to the users that need to approve the configuration commit.
View Source
def generate_merge_approval_signature( self, configuration_commit_id: str ) -> bytes: """ Generate an approval signature required for merging a configuration commit. To merge a specific configuration commit, each user referenced in the list of ids returned by `retrieveConfigurationCommitApprovers` needs to generate an approval signature using this method. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = GenerateMergeApprovalSignatureRequest( commitId=bytes.fromhex(configuration_commit_id), scope=bytes.fromhex(scope_id), ) responses = self.send_request( GcgRequest(generateMergeApprovalSignatureRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("generateMergeApprovalSignatureResponse"): raise Exception( "Expected generateMergeApprovalSignatureResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.generateMergeApprovalSignatureResponse.signature
Generate an approval signature required for merging a configuration commit.
To merge a specific configuration commit, each user referenced in the list
of ids returned by retrieveConfigurationCommitApprovers
needs to
generate an approval signature using this method.
View Source
def merge_configuration_commit( self, configuration_commit_id: str, approval_signatures: Dict[str, bytes], ) -> MergeConfigurationCommitResponse: """ Request the enclave to merge the given configuration commit into the main data room configuration. **Parameters**: - `configuration_commit_id`: The id of the commit to be merged. - `approval_signatures`: A dictionary containing the approval signature for each of the required approvers, e.g. `{ "some@email.com": signature }`. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = MergeConfigurationCommitRequest( scope=bytes.fromhex(scope_id), commitId=bytes.fromhex(configuration_commit_id), approvalSignatures=approval_signatures, ) responses = self.send_request( GcgRequest(mergeConfigurationCommitRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("mergeConfigurationCommitResponse"): raise Exception( "Expected mergeConfigurationCommitResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.mergeConfigurationCommitResponse
Request the enclave to merge the given configuration commit into the main data room configuration.
Parameters:
configuration_commit_id
: The id of the commit to be merged.approval_signatures
: A dictionary containing the approval signature for each of the required approvers, e.g.{ "some@email.com": signature }
.
View Source
def retrieve_data_room_configuration_history( self, data_room_id: str ) -> RetrieveDataRoomConfigurationHistoryResponse: """ Retrieve the current merged data room configuration, as well as the history of configuration commits that have already been merged. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) request = RetrieveDataRoomConfigurationHistoryRequest( dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveDataRoomConfigurationHistoryRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveDataRoomConfigurationHistoryResponse"): raise Exception( "Expected retrieveDataRoomConfigurationHistoryResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveDataRoomConfigurationHistoryResponse
Retrieve the current merged data room configuration, as well as the history of configuration commits that have already been merged.
View Source
def retrieve_current_data_room_configuration( self, data_room_id: str ) -> Tuple[DataRoomConfiguration, str]: """ Retrieve the current data room confguration, as well as the current "history pin". A history pin is the hash of all the ids of configuration commits that make up the structure of a data room. This pin therefore uniquely identifies a data room's structure at a certain point in time. A data room configuration, as well as its associated history pin, can be used to extend an existing data room (for example by adding new compute nodes). Extending an existing data room is done using the `DataRoomCommitBuilder` class. """ response = self.retrieve_data_room_configuration_history(data_room_id) return (response.currentConfiguration, response.pin.hex())
Retrieve the current data room confguration, as well as the current "history pin".
A history pin is the hash of all the ids of configuration commits that
make up the structure of a data room. This pin therefore uniquely identifies
a data room's structure at a certain point in time.
A data room configuration, as well as its associated history pin, can be used
to extend an existing data room (for example by adding new compute nodes).
Extending an existing data room is done using the DataRoomCommitBuilder
class.
View Source
def stop_data_room(self, data_room_id: str): """ Stop the data room with the given id, making it impossible to run new computations. """ self._update_data_room_status(data_room_id, DataRoomStatus.Value("Stopped"))
Stop the data room with the given id, making it impossible to run new computations.
View Source
def retrieve_data_room_status( self, data_room_id: str ) -> str: """ Returns the status of the data room. Valid values are `"Active"` or `"Stopped"`. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RetrieveDataRoomStatusRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveDataRoomStatusRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveDataRoomStatusResponse"): raise Exception( "Expected retrieveDataRoomStatusResponse, got " + str(response.WhichOneof("gcg_response")) ) return DataRoomStatus.Name(response.retrieveDataRoomStatusResponse.status)
Returns the status of the data room. Valid values are "Active"
or "Stopped"
.
View Source
def retrieve_data_room_definition( self, data_room_id: str ) -> RetrieveDataRoomResponse: """ Returns the underlying protobuf object for the data room. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) request = RetrieveDataRoomRequest( dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveDataRoomRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveDataRoomResponse"): raise Exception( "Expected retrieveDataRoomResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveDataRoomResponse
Returns the underlying protobuf object for the data room.
View Source
def retrieve_audit_log( self, data_room_id: str ) -> RetrieveAuditLogResponse: """ Returns the audit log for the data room. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RetrieveAuditLogRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveAuditLogRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveAuditLogResponse"): raise Exception( "Expected retrieveAuditLogResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveAuditLogResponse
Returns the audit log for the data room.
View Source
def publish_dataset( self, data_room_id: str, manifest_hash: str, leaf_id: str, key: Key, *, force: bool = False ) -> PublishDatasetToDataRoomResponse: """ Publishes a file and its encryption key to a data room. Neither the file or the encryption key will ever be stored in unencrypted form. This method will check whether the to-be-published file exists. If this is not the case, an exception will be raised. This behavior can be disabled by setting the `force` flag. In case the original client was created with platform integration enabled, the method will further check whether there already is a dataset published for the given data room. In this case, an exception will be thrown and the dataset will need to be unpublished first. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) dataset = self.client.get_dataset(manifest_hash) if not dataset and not force: raise Exception( "The dataset you are trying to publish does not exist" ) should_create_dataset_links =\ self.is_integrated_with_platform and \ self._is_web_data_room(data_room_id) if should_create_dataset_links: existing_link = self.platform._platform_api.get_dataset_link( data_room_id, self._convert_data_node_name_to_verifier_node(leaf_id) ) if existing_link: existing_dataset = existing_link["dataset"]["datasetId"] raise Exception( "The following dataset has already been published for this node." + " Please unpublish this dataset first." + f" Dataset: '{existing_dataset}'" ) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = PublishDatasetToDataRoomRequest( dataRoomId=bytes.fromhex(data_room_id), datasetHash=bytes.fromhex(manifest_hash), leafName=leaf_id, encryptionKey=key.material, scope=bytes.fromhex(scope_id) ) responses = self.send_request( GcgRequest(publishDatasetToDataRoomRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("publishDatasetToDataRoomResponse"): raise Exception( "Expected publishDatasetToDataRoomResponse, got " + str(response.WhichOneof("gcg_response")) ) else: if should_create_dataset_links: self.platform._platform_api.create_dataset_link( data_room_id, manifest_hash, self._convert_data_node_name_to_verifier_node(leaf_id) ) return response.publishDatasetToDataRoomResponse
Publishes a file and its encryption key to a data room. Neither the file or the encryption key will ever be stored in unencrypted form.
This method will check whether the to-be-published file exists.
If this is not the case, an exception will be raised.
This behavior can be disabled by setting the force
flag.
In case the original client was created with platform integration enabled, the method will further check whether there already is a dataset published for the given data room. In this case, an exception will be thrown and the dataset will need to be unpublished first.
View Source
def remove_published_dataset( self, data_room_id: str, leaf_id: str, ) -> RemovePublishedDatasetResponse: """ Removes a published dataset from the data room. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RemovePublishedDatasetRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), leafName=leaf_id, ) responses = self.send_request( GcgRequest(removePublishedDatasetRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("removePublishedDatasetResponse"): raise Exception( "Expected removePublishedDatasetResponse, got " + str(response.WhichOneof("gcg_response")) ) else: if self.is_integrated_with_platform and self._is_web_data_room(data_room_id): try: self.platform._platform_api.delete_dataset_link( data_room_id, self._convert_data_node_name_to_verifier_node(leaf_id) ) except Exception as error: print(f"Error when deleting dataset link on platform: {error}") return response.removePublishedDatasetResponse
Removes a published dataset from the data room.
View Source
def retrieve_published_datasets( self, data_room_id: str, ) -> RetrievePublishedDatasetsResponse: """ Returns the datasets published to the given data room. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RetrievePublishedDatasetsRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrievePublishedDatasetsRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrievePublishedDatasetsResponse"): raise Exception( "Expected retrievePublishedDatasetResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrievePublishedDatasetsResponse
Returns the datasets published to the given data room.
View Source
def get_computation_status(self, job_id: str) -> JobStatusResponse: """ Returns the status of the provided `job_id` which will include the names of the nodes that completed their execution """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) request = JobStatusRequest( jobId=bytes.fromhex(job_id), ) responses = self.send_request( GcgRequest(jobStatusRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("jobStatusResponse"): raise Exception( "Expected jobStatusResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.jobStatusResponse
Returns the status of the provided job_id
which will include the names
of the nodes that completed their execution
View Source
def run_computation( self, data_room_id: str, compute_node_id: str, /, *, dry_run: bool = False, ) -> JobId: """ Run a specific computation within the data room with the given id. The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results. """ response = self._submit_compute(data_room_id, [compute_node_id], dry_run=dry_run) return JobId(response.jobId.hex(), compute_node_id)
Run a specific computation within the data room with the given id.
The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results.
View Source
def wait_until_computation_has_finished( self, job_id: JobId, /, *, interval: int = 5, timeout: int = None ): """ Wait for the given job to complete. The method will check for the job's completeness every `interval` seconds and up to an optional `timeout` seconds after which the method will raise an exception. """ elapsed = 0 while True: if timeout is not None and elapsed > timeout: raise Exception( f"Timeout when trying to get result for job {job_id.id} of" f" {job_id.compute_node_id} (waited {timeout} seconds)" ) elif job_id.compute_node_id in self.get_computation_status(job_id.id).completeComputeNodeNames: break else: sleep(interval) elapsed += interval
Wait for the given job to complete.
The method will check for the job's completeness every interval
seconds and up to
an optional timeout
seconds after which the method will raise an exception.
View Source
def run_dev_computation( self, configuration_commit_id: str, compute_node_id: str, /, *, dry_run: bool = False, ) -> JobId: """ Run a specific computation within the context of the data room configuration defined by the given commit id. Such "development" computations can also be run for configuration commits that have not yet been merged. The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results. """ response = self._submit_dev_compute(configuration_commit_id, [compute_node_id], dry_run=dry_run) return JobId(response.jobId.hex(), compute_node_id)
Run a specific computation within the context of the data room configuration defined by the given commit id. Such "development" computations can also be run for configuration commits that have not yet been merged.
The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results.
View Source
def get_computation_result( self, job_id: JobId, /, *, interval: int = 5, timeout: int = None ) -> bytes: """ Wait for the given job to complete and retrieve its results as a raw byte string. The method will check for the job's completeness every `interval` seconds and up to an optional `timeout` seconds after which the method will raise an exception. If the job completes and the results can be retrieved successfully, a raw byte string will be returned. The bytes tring can be transformed into a more useful object using a variety of helper methods. These helper methods are specific for the type of computation you ran and can be found in the corresponding packages. """ elapsed = 0 job_id_bytes = bytes.fromhex(job_id.id) self.wait_until_computation_has_finished( job_id, interval=interval, timeout=timeout ) results = self._get_job_results(job_id_bytes, job_id.compute_node_id) return results
Wait for the given job to complete and retrieve its results as a raw byte string.
The method will check for the job's completeness every interval
seconds and up to
an optional timeout
seconds after which the method will raise an exception.
If the job completes and the results can be retrieved successfully, a raw byte string
will be returned. The bytes tring can be transformed into a more useful object using
a variety of helper methods. These helper methods are specific for the type of computation
you ran and can be found in the corresponding packages.
View Source
def run_computation_and_get_results( self, data_room_id: str, compute_node_id: str, /, *, interval: int = 5, timeout: int = None ) -> Optional[bytes]: """ Run a specific computation and return its results. This method is simply a wrapper for running `run_computation` and `get_computation_result` directly after each other """ job_id = self.run_computation(data_room_id, compute_node_id) return self.get_computation_result( job_id, interval=interval, timeout=timeout )
Run a specific computation and return its results.
This method is simply a wrapper for running run_computation
and get_computation_result
directly after each other
Provider of a list of convenience methods to interact with the Decentriq platform.
This field exposes an object that provides a set of convenience features known from the Decentriq web platform. These include, for example, getting the list of data sets that are also visible in the browser-based platform UI.
View Source
class DataRoomBuilder(): """ A helper class to ease the building process of a data clean room. """ name: str governance_protocol: GovernanceProtocolProto description: Optional[str] owner_email: Optional[str] modifications_builder: DataRoomModificationsBuilder def __init__( self, name: str, enclave_specs: Dict[str, EnclaveSpecification], governance_protocol: GovernanceProtocolProto = GovernanceProtocol.static(), *, add_basic_user_permissions: bool = True, description: str = None, owner_email: str = None, ) -> None: """ Create a data room builder object. **Parameters**: - `name`: The name of the data room to be created. - `enclave_specs`: The enclave specification set in which to look-up enclave specs for enclaves responsible for executing compute nodes. These specs are provided by the `decentriq_platform.enclave_specifications` catalogue. - `governance_protocol`: The protocol that defines whether and how a data room can be changed after it has been published. - `add_basic_user_permissions`: Whether to add basic user permissions for each participant. These are: 1. Permission to retrieve the data room definition 2. Permission to retrieve the status of the data room 3. Permission to retrieve the audit log 4. Permission to retrieve the list of datasets that have been published to the data room 5. Permission to run development computations - `description`: Description of the data room. - `owner_email`: A custom owner of the data room. By default this will be set to the owner of the session publishing the data room. """ assert name, "The DCR must have a non-empty name" self.modifications_builder = DataRoomModificationsBuilder( enclave_specs, add_basic_user_permissions=add_basic_user_permissions ) self.name = name self.owner_email = owner_email self.description = description self.governance_protocol = governance_protocol self.enclave_specs = enclave_specs if description: self.add_description(description) def add_data_node( self, name: str, is_required: bool = False, node_id: Optional[str] = None ) -> str: """ Add a new data node. If the node is marked as required, any computation which includes it as a dependency will not start in case no data has been provided yet. **Parameters**: - `name`: Name of the data node. - `is_required`: If true, any computation which depends on this data node can only be run if data has been provided for this node. - `node_id`: A custom identifier for this node. If not specified, the identifier is generated automatically. **Returns**: The id that was assigned to the added data node. This id will be needed when connecting a dataset or when permissions condering this node are defined. """ return self.modifications_builder.add_data_node( name, is_required=is_required, node_id=node_id ) def add_compute_node( self, node: Node, node_id: Optional[str] = None, attestation_id: Optional[str] = None ) -> str: """ Add a new compute node. Specific compute node classes are provided either by the main package or by the respective compute submodules. **Parameters**: - `node`: The node object to add. - `node_id`: A customer identifier for this node. If not specified, the identifier is generated automatically. - `attestation_id`: An identifier for a concrete attestation specification to use for this compute node. If not specified, the specification is chosen automatically based on the type of compute node. **Returns**: The id that was assigned to the added compute node. This id is needed when running computations or when adding permissions concerning this node. """ return self.modifications_builder.add_compute_node( node, node_id=node_id, attestation_id=attestation_id ) def add_user_permission( self, email: str, authentication_method: AuthenticationMethod, permissions: List[Permission], ): """ Add permissions for a user. The authentication is performed on the enclave side based on the method supplied. """ return self.modifications_builder.add_user_permission( email, authentication_method, permissions ) def add_description(self, description: str): """Add a description to the data room being built.""" self.description = description def add_owner_email(self, email: str): """ Specify a specific owner of the data room. By default, the current user will be used. """ self.owner_email = email def build(self) -> Tuple[DataRoom, List[ConfigurationModification]]: """ Finalize data room contruction. Built data rooms still need to be published by a `decentriq_platform.Session` before they can be interacted with. **Returns**: A tuple containing the following elements: (1) A data room object that stores the core properties of the DCR, such as its name, its owner, and the government protocol that defines what changes can be made to the data room's configuration. (2) A list of the recorded modifications that will be applied to the initially empty data room configuration within the enclave. """ data_room = DataRoom() data_room.name = self.name if self.owner_email: data_room.ownerEmail = self.owner_email if self.description: data_room.description = self.description data_room.id = DataRoomBuilder._generate_id() data_room.governanceProtocol.CopyFrom(self.governance_protocol) modifications = self.modifications_builder.build() return data_room, modifications @staticmethod def _generate_id(): return str(uuid.uuid4())
A helper class to ease the building process of a data clean room.
View Source
def __init__( self, name: str, enclave_specs: Dict[str, EnclaveSpecification], governance_protocol: GovernanceProtocolProto = GovernanceProtocol.static(), *, add_basic_user_permissions: bool = True, description: str = None, owner_email: str = None, ) -> None: """ Create a data room builder object. **Parameters**: - `name`: The name of the data room to be created. - `enclave_specs`: The enclave specification set in which to look-up enclave specs for enclaves responsible for executing compute nodes. These specs are provided by the `decentriq_platform.enclave_specifications` catalogue. - `governance_protocol`: The protocol that defines whether and how a data room can be changed after it has been published. - `add_basic_user_permissions`: Whether to add basic user permissions for each participant. These are: 1. Permission to retrieve the data room definition 2. Permission to retrieve the status of the data room 3. Permission to retrieve the audit log 4. Permission to retrieve the list of datasets that have been published to the data room 5. Permission to run development computations - `description`: Description of the data room. - `owner_email`: A custom owner of the data room. By default this will be set to the owner of the session publishing the data room. """ assert name, "The DCR must have a non-empty name" self.modifications_builder = DataRoomModificationsBuilder( enclave_specs, add_basic_user_permissions=add_basic_user_permissions ) self.name = name self.owner_email = owner_email self.description = description self.governance_protocol = governance_protocol self.enclave_specs = enclave_specs if description: self.add_description(description)
Create a data room builder object.
Parameters:
name
: The name of the data room to be created.enclave_specs
: The enclave specification set in which to look-up enclave specs for enclaves responsible for executing compute nodes. These specs are provided by thedecentriq_platform.enclave_specifications
catalogue.governance_protocol
: The protocol that defines whether and how a data room can be changed after it has been published.add_basic_user_permissions
: Whether to add basic user permissions for each participant. These are:- Permission to retrieve the data room definition
- Permission to retrieve the status of the data room
- Permission to retrieve the audit log
- Permission to retrieve the list of datasets that have been published to the data room
- Permission to run development computations
description
: Description of the data room.owner_email
: A custom owner of the data room. By default this will be set to the owner of the session publishing the data room.
View Source
def add_data_node( self, name: str, is_required: bool = False, node_id: Optional[str] = None ) -> str: """ Add a new data node. If the node is marked as required, any computation which includes it as a dependency will not start in case no data has been provided yet. **Parameters**: - `name`: Name of the data node. - `is_required`: If true, any computation which depends on this data node can only be run if data has been provided for this node. - `node_id`: A custom identifier for this node. If not specified, the identifier is generated automatically. **Returns**: The id that was assigned to the added data node. This id will be needed when connecting a dataset or when permissions condering this node are defined. """ return self.modifications_builder.add_data_node( name, is_required=is_required, node_id=node_id )
Add a new data node. If the node is marked as required, any computation which includes it as a dependency will not start in case no data has been provided yet.
Parameters:
name
: Name of the data node.is_required
: If true, any computation which depends on this data node can only be run if data has been provided for this node.node_id
: A custom identifier for this node. If not specified, the identifier is generated automatically.
Returns: The id that was assigned to the added data node. This id will be needed when connecting a dataset or when permissions condering this node are defined.
View Source
def add_compute_node( self, node: Node, node_id: Optional[str] = None, attestation_id: Optional[str] = None ) -> str: """ Add a new compute node. Specific compute node classes are provided either by the main package or by the respective compute submodules. **Parameters**: - `node`: The node object to add. - `node_id`: A customer identifier for this node. If not specified, the identifier is generated automatically. - `attestation_id`: An identifier for a concrete attestation specification to use for this compute node. If not specified, the specification is chosen automatically based on the type of compute node. **Returns**: The id that was assigned to the added compute node. This id is needed when running computations or when adding permissions concerning this node. """ return self.modifications_builder.add_compute_node( node, node_id=node_id, attestation_id=attestation_id )
Add a new compute node. Specific compute node classes are provided either by the main package or by the respective compute submodules.
Parameters:
node
: The node object to add.node_id
: A customer identifier for this node. If not specified, the identifier is generated automatically.attestation_id
: An identifier for a concrete attestation specification to use for this compute node. If not specified, the specification is chosen automatically based on the type of compute node.
Returns: The id that was assigned to the added compute node. This id is needed when running computations or when adding permissions concerning this node.
View Source
def add_user_permission( self, email: str, authentication_method: AuthenticationMethod, permissions: List[Permission], ): """ Add permissions for a user. The authentication is performed on the enclave side based on the method supplied. """ return self.modifications_builder.add_user_permission( email, authentication_method, permissions )
Add permissions for a user. The authentication is performed on the enclave side based on the method supplied.
View Source
def add_description(self, description: str): """Add a description to the data room being built.""" self.description = description
Add a description to the data room being built.
View Source
def add_owner_email(self, email: str): """ Specify a specific owner of the data room. By default, the current user will be used. """ self.owner_email = email
Specify a specific owner of the data room. By default, the current user will be used.
View Source
def build(self) -> Tuple[DataRoom, List[ConfigurationModification]]: """ Finalize data room contruction. Built data rooms still need to be published by a `decentriq_platform.Session` before they can be interacted with. **Returns**: A tuple containing the following elements: (1) A data room object that stores the core properties of the DCR, such as its name, its owner, and the government protocol that defines what changes can be made to the data room's configuration. (2) A list of the recorded modifications that will be applied to the initially empty data room configuration within the enclave. """ data_room = DataRoom() data_room.name = self.name if self.owner_email: data_room.ownerEmail = self.owner_email if self.description: data_room.description = self.description data_room.id = DataRoomBuilder._generate_id() data_room.governanceProtocol.CopyFrom(self.governance_protocol) modifications = self.modifications_builder.build() return data_room, modifications
Finalize data room contruction.
Built data rooms still need to be published by a
decentriq_platform.Session
before they can be interacted with.
Returns: A tuple containing the following elements: (1) A data room object that stores the core properties of the DCR, such as its name, its owner, and the government protocol that defines what changes can be made to the data room's configuration. (2) A list of the recorded modifications that will be applied to the initially empty data room configuration within the enclave.
View Source
class DataRoomCommitBuilder: """ A helper class to build a data room configuration commit, i.e. a list of modifications that are to be applied to the configuration of an existing data room. """ data_room_id: str history_pin: str modifications_builder: DataRoomModificationsBuilder def __init__( self, data_room_id: str, current_configuration: DataRoomConfiguration, history_pin: str, enclave_specs: Dict[str, EnclaveSpecification], *, add_basic_user_permissions: bool = False, ): """ Construct a builder object for constructing new data room configuration commits. A configuraton commit contains a list of modifications that can be applied to existing data room configuration. **Parameters**: - `data_room_id`: The data room to which the modifications should be applied. - `data_room_configuration`: The current configuration of the data room to be altered. - `history_pin`: The current history pin that identifies a specific point in time within a line of configuration changes. - `enclave_specs`: The enclave specification set in which to look-up enclave specs for enclaves responsible for executing compute nodes. These specs are provided by the `decentriq_platform.enclave_specifications` catalogue. - `add_basic_user_permissions`: Whether to add basic user permissions for each participant. These are: 1. Permission to retrieve the data room definition 2. Permission to retrieve the status of the data room 3. Permission to retrieve the audit log 4. Permission to retrieve the list of datasets that have been published to the data room 5. Permission to run development computations """ self.data_room_id = data_room_id self.history_pin = history_pin self.modifications_builder = DataRoomModificationsBuilder( enclave_specs, data_room_id=data_room_id, history_pin=history_pin, data_room_configuration=current_configuration, add_basic_user_permissions=add_basic_user_permissions, ) def add_data_node( self, name: str, is_required: bool = False, node_id: Optional[str] = None ) -> str: """ Add a new data node. If the node is marked as required, any computation which includes it as a dependency will not start in case no data has been provided yet. **Parameters**: - `name`: Name of the data node. - `is_required`: If true, any computation which depends on this data node can only be run if data has been provided for this node. - `node_id`: A custom identifier for this node. If not specified, the identifier is generated automatically. **Returns**: The id that was assigned to the added data node. This id will be needed when connecting a dataset or when permissions condering this node are defined. """ return self.modifications_builder.add_data_node( name, is_required=is_required, node_id=node_id ) def change_attestation_specification( self, attestation_id: str, attestation_specification: AttestationSpecification ) -> str: self.modifications_builder.change_attestation_specification( attestation_id, attestation_specification ) def add_compute_node( self, node: Node, node_id: Optional[str] = None, attestation_id: Optional[str] = None ) -> str: """ Add a new compute node. Specific compute node classes are provided either by the main package or by the respective compute submodules. **Parameters**: - `node`: The node object to add. - `node_id`: A customer identifier for this node. If not specified, the identifier is generated automatically. - `attestation_id`: An identifier for a concrete attestation specification to use for this compute node. If not specified, the specification is chosen automatically based on the type of compute node. **Returns**: The id that was assigned to the added compute node. This id is needed when running computations or when adding permissions concerning this node. """ return self.modifications_builder.add_compute_node( node, node_id=node_id, attestation_id=attestation_id ) def add_user_permission( self, email: str, authentication_method: AuthenticationMethod, permissions: List[Permission], ): """ Add permissions for a user. The authentication is performed on the enclave side based on the method supplied. """ return self.modifications_builder.add_user_permission( email, authentication_method, permissions ) def build(self): """ Build the data room configuration commit. The built commit still needs to be published and merged in order for it to be made part of the data room configuration. """ return ConfigurationCommit( dataRoomId=bytes.fromhex(self.data_room_id), dataRoomHistoryPin=bytes.fromhex(self.history_pin), modifications=self.modifications_builder.build() )
A helper class to build a data room configuration commit, i.e. a list of modifications that are to be applied to the configuration of an existing data room.
View Source
def __init__( self, data_room_id: str, current_configuration: DataRoomConfiguration, history_pin: str, enclave_specs: Dict[str, EnclaveSpecification], *, add_basic_user_permissions: bool = False, ): """ Construct a builder object for constructing new data room configuration commits. A configuraton commit contains a list of modifications that can be applied to existing data room configuration. **Parameters**: - `data_room_id`: The data room to which the modifications should be applied. - `data_room_configuration`: The current configuration of the data room to be altered. - `history_pin`: The current history pin that identifies a specific point in time within a line of configuration changes. - `enclave_specs`: The enclave specification set in which to look-up enclave specs for enclaves responsible for executing compute nodes. These specs are provided by the `decentriq_platform.enclave_specifications` catalogue. - `add_basic_user_permissions`: Whether to add basic user permissions for each participant. These are: 1. Permission to retrieve the data room definition 2. Permission to retrieve the status of the data room 3. Permission to retrieve the audit log 4. Permission to retrieve the list of datasets that have been published to the data room 5. Permission to run development computations """ self.data_room_id = data_room_id self.history_pin = history_pin self.modifications_builder = DataRoomModificationsBuilder( enclave_specs, data_room_id=data_room_id, history_pin=history_pin, data_room_configuration=current_configuration, add_basic_user_permissions=add_basic_user_permissions, )
Construct a builder object for constructing new data room configuration commits. A configuraton commit contains a list of modifications that can be applied to existing data room configuration.
Parameters:
data_room_id
: The data room to which the modifications should be applied.data_room_configuration
: The current configuration of the data room to be altered.history_pin
: The current history pin that identifies a specific point in time within a line of configuration changes.enclave_specs
: The enclave specification set in which to look-up enclave specs for enclaves responsible for executing compute nodes. These specs are provided by thedecentriq_platform.enclave_specifications
catalogue.add_basic_user_permissions
: Whether to add basic user permissions for each participant. These are:- Permission to retrieve the data room definition
- Permission to retrieve the status of the data room
- Permission to retrieve the audit log
- Permission to retrieve the list of datasets that have been published to the data room
- Permission to run development computations
View Source
def add_data_node( self, name: str, is_required: bool = False, node_id: Optional[str] = None ) -> str: """ Add a new data node. If the node is marked as required, any computation which includes it as a dependency will not start in case no data has been provided yet. **Parameters**: - `name`: Name of the data node. - `is_required`: If true, any computation which depends on this data node can only be run if data has been provided for this node. - `node_id`: A custom identifier for this node. If not specified, the identifier is generated automatically. **Returns**: The id that was assigned to the added data node. This id will be needed when connecting a dataset or when permissions condering this node are defined. """ return self.modifications_builder.add_data_node( name, is_required=is_required, node_id=node_id )
Add a new data node. If the node is marked as required, any computation which includes it as a dependency will not start in case no data has been provided yet.
Parameters:
name
: Name of the data node.is_required
: If true, any computation which depends on this data node can only be run if data has been provided for this node.node_id
: A custom identifier for this node. If not specified, the identifier is generated automatically.
Returns: The id that was assigned to the added data node. This id will be needed when connecting a dataset or when permissions condering this node are defined.
View Source
def change_attestation_specification( self, attestation_id: str, attestation_specification: AttestationSpecification ) -> str: self.modifications_builder.change_attestation_specification( attestation_id, attestation_specification )
View Source
def add_compute_node( self, node: Node, node_id: Optional[str] = None, attestation_id: Optional[str] = None ) -> str: """ Add a new compute node. Specific compute node classes are provided either by the main package or by the respective compute submodules. **Parameters**: - `node`: The node object to add. - `node_id`: A customer identifier for this node. If not specified, the identifier is generated automatically. - `attestation_id`: An identifier for a concrete attestation specification to use for this compute node. If not specified, the specification is chosen automatically based on the type of compute node. **Returns**: The id that was assigned to the added compute node. This id is needed when running computations or when adding permissions concerning this node. """ return self.modifications_builder.add_compute_node( node, node_id=node_id, attestation_id=attestation_id )
Add a new compute node. Specific compute node classes are provided either by the main package or by the respective compute submodules.
Parameters:
node
: The node object to add.node_id
: A customer identifier for this node. If not specified, the identifier is generated automatically.attestation_id
: An identifier for a concrete attestation specification to use for this compute node. If not specified, the specification is chosen automatically based on the type of compute node.
Returns: The id that was assigned to the added compute node. This id is needed when running computations or when adding permissions concerning this node.
View Source
def add_user_permission( self, email: str, authentication_method: AuthenticationMethod, permissions: List[Permission], ): """ Add permissions for a user. The authentication is performed on the enclave side based on the method supplied. """ return self.modifications_builder.add_user_permission( email, authentication_method, permissions )
Add permissions for a user. The authentication is performed on the enclave side based on the method supplied.
View Source
def build(self): """ Build the data room configuration commit. The built commit still needs to be published and merged in order for it to be made part of the data room configuration. """ return ConfigurationCommit( dataRoomId=bytes.fromhex(self.data_room_id), dataRoomHistoryPin=bytes.fromhex(self.history_pin), modifications=self.modifications_builder.build() )
Build the data room configuration commit.
The built commit still needs to be published and merged in order for it to be made part of the data room configuration.
View Source
class Permissions: """Helper class for creating data room permissions.""" def __init__(self): """This class is not meant to be instantiated.""" @staticmethod def leaf_crud(leaf_node_name: str) -> Permission: """Permission required for publishing a dataset to a data room.""" return Permission( leafCrudPermission=LeafCrudPermission(leafNodeName=leaf_node_name) ) @staticmethod def retrieve_data_room() -> Permission: """Permission required for retrieving a data room's definition after it has been published.""" return Permission(retrieveDataRoomPermission=RetrieveDataRoomPermission()) @staticmethod def retrieve_audit_log() -> Permission: """Permission for retrieving the audit log, a log detailing all past interactions with the data room.""" return Permission(retrieveAuditLogPermission=RetrieveAuditLogPermission()) @staticmethod def execute_compute(compute_node_name: str) -> Permission: """Permission for executing the computation with the given name.""" return Permission( executeComputePermission=ExecuteComputePermission( computeNodeName=compute_node_name ) ) @staticmethod def retrieve_data_room_status() -> Permission: """Permission for retrieving the status of a data room.""" return Permission( retrieveDataRoomStatusPermission=RetrieveDataRoomStatusPermission() ) @staticmethod def update_data_room_status() -> Permission: """Permission for updating the status of a data room (e.g. irreversibly stopping it).""" return Permission( updateDataRoomStatusPermission=UpdateDataRoomStatusPermission() ) @staticmethod def retrieve_published_datasets() -> Permission: """Permission for retrieving the list of datasets that has been published to the data room.""" return Permission( retrievePublishedDatasetsPermission=RetrievePublishedDatasetsPermission() ) @staticmethod def dry_run() -> Permission: """Permission for triggering a dry run on the data room.""" return Permission(dryRunPermission=DryRunPermission()) @staticmethod def generate_merge_signature() -> Permission: """Permission for generating signatures required for merge approvals.""" return Permission( generateMergeSignaturePermission=GenerateMergeSignaturePermission() ) @staticmethod def execute_development_compute() -> Permission: """Permission for executing computations in development mode.""" return Permission( executeDevelopmentComputePermission=ExecuteDevelopmentComputePermission() ) @staticmethod def merge_configuration_commit() -> Permission: """ Permission for merging configuration commits into the current data room configuration. """ return Permission( mergeConfigurationCommitPermission=MergeConfigurationCommitPermission() )
Helper class for creating data room permissions.
View Source
def __init__(self): """This class is not meant to be instantiated."""
This class is not meant to be instantiated.
View Source
@staticmethod def leaf_crud(leaf_node_name: str) -> Permission: """Permission required for publishing a dataset to a data room.""" return Permission( leafCrudPermission=LeafCrudPermission(leafNodeName=leaf_node_name) )
Permission required for publishing a dataset to a data room.
View Source
@staticmethod def retrieve_data_room() -> Permission: """Permission required for retrieving a data room's definition after it has been published.""" return Permission(retrieveDataRoomPermission=RetrieveDataRoomPermission())
Permission required for retrieving a data room's definition after it has been published.
View Source
@staticmethod def retrieve_audit_log() -> Permission: """Permission for retrieving the audit log, a log detailing all past interactions with the data room.""" return Permission(retrieveAuditLogPermission=RetrieveAuditLogPermission())
Permission for retrieving the audit log, a log detailing all past interactions with the data room.
View Source
@staticmethod def execute_compute(compute_node_name: str) -> Permission: """Permission for executing the computation with the given name.""" return Permission( executeComputePermission=ExecuteComputePermission( computeNodeName=compute_node_name ) )
Permission for executing the computation with the given name.
View Source
@staticmethod def retrieve_data_room_status() -> Permission: """Permission for retrieving the status of a data room.""" return Permission( retrieveDataRoomStatusPermission=RetrieveDataRoomStatusPermission() )
Permission for retrieving the status of a data room.
View Source
@staticmethod def update_data_room_status() -> Permission: """Permission for updating the status of a data room (e.g. irreversibly stopping it).""" return Permission( updateDataRoomStatusPermission=UpdateDataRoomStatusPermission() )
Permission for updating the status of a data room (e.g. irreversibly stopping it).
View Source
@staticmethod def retrieve_published_datasets() -> Permission: """Permission for retrieving the list of datasets that has been published to the data room.""" return Permission( retrievePublishedDatasetsPermission=RetrievePublishedDatasetsPermission() )
Permission for retrieving the list of datasets that has been published to the data room.
View Source
@staticmethod def dry_run() -> Permission: """Permission for triggering a dry run on the data room.""" return Permission(dryRunPermission=DryRunPermission())
Permission for triggering a dry run on the data room.
View Source
@staticmethod def generate_merge_signature() -> Permission: """Permission for generating signatures required for merge approvals.""" return Permission( generateMergeSignaturePermission=GenerateMergeSignaturePermission() )
Permission for generating signatures required for merge approvals.
View Source
@staticmethod def execute_development_compute() -> Permission: """Permission for executing computations in development mode.""" return Permission( executeDevelopmentComputePermission=ExecuteDevelopmentComputePermission() )
Permission for executing computations in development mode.
View Source
@staticmethod def merge_configuration_commit() -> Permission: """ Permission for merging configuration commits into the current data room configuration. """ return Permission( mergeConfigurationCommitPermission=MergeConfigurationCommitPermission() )
Permission for merging configuration commits into the current data room configuration.
View Source
class GovernanceProtocol: """ The protocol that defines whether and how a data room can be changed after it has been published. """ @staticmethod def static(): """ The data room cannot be changed after it has been published. Participants are still allowed to execute development computations as long as the required permissions have been granted. """ return GovernanceProtocolProto( staticDataRoomPolicy=StaticDataRoomPolicy() ) @staticmethod def affected_data_owners_approve(): """ The addition of compute nodes must be approved by all data owners on whose data the new compute node will depend. """ return GovernanceProtocolProto( affectedDataOwnersApprovePolicy=AffectedDataOwnersApprovePolicy() )
The protocol that defines whether and how a data room can be changed after it has been published.
View Source
@staticmethod def static(): """ The data room cannot be changed after it has been published. Participants are still allowed to execute development computations as long as the required permissions have been granted. """ return GovernanceProtocolProto( staticDataRoomPolicy=StaticDataRoomPolicy() )
The data room cannot be changed after it has been published. Participants are still allowed to execute development computations as long as the required permissions have been granted.
View Source
@staticmethod def affected_data_owners_approve(): """ The addition of compute nodes must be approved by all data owners on whose data the new compute node will depend. """ return GovernanceProtocolProto( affectedDataOwnersApprovePolicy=AffectedDataOwnersApprovePolicy() )
The addition of compute nodes must be approved by all data owners on whose data the new compute node will depend.
View Source
class EnclaveSpecifications: """ Provider of the available enclave specifications provided by the Decentriq platform. Enclave specifications enable you to express which particular enclaves you trust. The field containing the measurement (e.g. `mrenclave` in the case of Intel SGX) identifies the exact binary that will process your data. Users of the Decentriq platform are encouraged to reproduce this value by building the enclave binary from audited source code and re-producing the measurement (in the case of Intel SGX, this would involve simply hashing the produced executable). When connecting to the driver enclave, the configured attestation algorithm will guarantee that the enclave you connect to is the one corresponding to the enclave specification you chose. The associated root certificate will be used to verify that the attestation was signed by the expected party (e.g. Intel/AMD/Amazon, depending on the CC technology used). Any communication between the driver enclave and worker enclaves handling your data will also first be secured by additional attestation procedures. Which enclaves are trusted by the driver enclave is controlled by choosing the additional enclave specs from the respective compute packages. A list of enclave specifications, each encoding your trust in a particular enclave type, can be obtained by selecting a subset of the enclave specifications provided by the object `decentriq_platform.enclave_specifications`. Selecting the subset of versions should be done by calling its `versions` method. """ def __init__(self, specifications: Dict[str, EnclaveSpecification]): self.specifications = specifications def list(self) -> List[str]: """Get a list of all available enclave identifiers.""" return sorted(self.specifications.keys()) def versions(self, enclave_versions: List[str]) -> Dict[str, EnclaveSpecification]: """ Get the enclave specifications for the given versioned enclave types. Make sure to always include the specification of a *driver enclave*, e.g. `"decentriq.driver:v1"` as this is the node with which you communicate directly. Add additional versioned enclaves depending on the compute module you use. Refer to the main documentation page of each compute module to learn which enclaves are available. """ selected_specifcations = {} for version in enclave_versions: enclave_type = version.split(":")[0] selected_specifcations[enclave_type] = self.specifications[version] return selected_specifcations def all(self) -> List[EnclaveSpecification]: """Get a list of all available enclave specifications.""" return list(self.specifications.values()) def merge(self, other): """ Merge two sets of enclave specifications into a single set. """ return EnclaveSpecifications({**self.specifications, **other.specifications})
Provider of the available enclave specifications provided by the Decentriq platform.
Enclave specifications enable you to express which particular enclaves you trust.
The field containing the measurement (e.g. mrenclave
in the case of Intel SGX) identifies
the exact binary that will process your data.
Users of the Decentriq platform are encouraged to reproduce this value by building the enclave
binary from audited source code and re-producing the measurement (in the case of Intel SGX,
this would involve simply hashing the produced executable).
When connecting to the driver enclave, the configured attestation algorithm will guarantee that the enclave you connect to is the one corresponding to the enclave specification you chose. The associated root certificate will be used to verify that the attestation was signed by the expected party (e.g. Intel/AMD/Amazon, depending on the CC technology used).
Any communication between the driver enclave and worker enclaves handling your data will also first be secured by additional attestation procedures. Which enclaves are trusted by the driver enclave is controlled by choosing the additional enclave specs from the respective compute packages.
A list of enclave specifications, each encoding your trust in a particular enclave type, can
be obtained by selecting a subset of the enclave specifications provided by the object
decentriq_platform.enclave_specifications
. Selecting the subset of versions should be done
by calling its versions
method.
View Source
def __init__(self, specifications: Dict[str, EnclaveSpecification]): self.specifications = specifications
View Source
def list(self) -> List[str]: """Get a list of all available enclave identifiers.""" return sorted(self.specifications.keys())
Get a list of all available enclave identifiers.
View Source
def versions(self, enclave_versions: List[str]) -> Dict[str, EnclaveSpecification]: """ Get the enclave specifications for the given versioned enclave types. Make sure to always include the specification of a *driver enclave*, e.g. `"decentriq.driver:v1"` as this is the node with which you communicate directly. Add additional versioned enclaves depending on the compute module you use. Refer to the main documentation page of each compute module to learn which enclaves are available. """ selected_specifcations = {} for version in enclave_versions: enclave_type = version.split(":")[0] selected_specifcations[enclave_type] = self.specifications[version] return selected_specifcations
Get the enclave specifications for the given versioned enclave types.
Make sure to always include the specification of a driver enclave, e.g.
"decentriq.driver:v1"
as this is the node with which you communicate directly.
Add additional versioned enclaves depending on the compute module you use.
Refer to the main documentation page of each compute module to learn which
enclaves are available.
View Source
def all(self) -> List[EnclaveSpecification]: """Get a list of all available enclave specifications.""" return list(self.specifications.values())
Get a list of all available enclave specifications.
View Source
def merge(self, other): """ Merge two sets of enclave specifications into a single set. """ return EnclaveSpecifications({**self.specifications, **other.specifications})
Merge two sets of enclave specifications into a single set.
View Source
class Key(): """ This class wraps the key material that is used to encrypt the files that are uploaded to the decentriq platform. """ material: bytes def __init__(self, material: Optional[bytes] = None): """ Returns a new `Key` instance, can optional specify the raw key material. """ if material == None: key_bytes = os.urandom(KEY_LEN) else: if len(material) != KEY_LEN: raise Exception("Invalid key length, must be 32 bytes") key_bytes = material self.material = key_bytes
This class wraps the key material that is used to encrypt the files that are uploaded to the decentriq platform.
View Source
def __init__(self, material: Optional[bytes] = None): """ Returns a new `Key` instance, can optional specify the raw key material. """ if material == None: key_bytes = os.urandom(KEY_LEN) else: if len(material) != KEY_LEN: raise Exception("Invalid key length, must be 32 bytes") key_bytes = material self.material = key_bytes
Returns a new Key
instance, can optional specify the raw key material.
View Source
class StaticContent(Node): """ Computation node which outputs the content specified in its configuration. This is mostly used to allow users to specify dependencies with a static content, which are part of the DCR definition. """ def __init__( self, name: str, content: bytes, dependencies: List[str] = [] ) -> None: """ Create a node with the given name and content. In case the source of the content is a file on your local machine, you can open the file in binary mode before reading it: ``` # Note the "rb" argument with open("my_script.py", "rb") as data: my_script_content = data.read() # my_script_content can now be passed to the StaticContent constructor ``` """ config = serialize_length_delimited( DriverTaskConfig( staticContent=StaticContentConfig( content=content ) ) ) super().__init__( name, config=config, enclave_type="decentriq.driver", dependencies=dependencies, output_format=ComputeNodeFormat.RAW )
Computation node which outputs the content specified in its configuration. This is mostly used to allow users to specify dependencies with a static content, which are part of the DCR definition.
View Source
def __init__( self, name: str, content: bytes, dependencies: List[str] = [] ) -> None: """ Create a node with the given name and content. In case the source of the content is a file on your local machine, you can open the file in binary mode before reading it: ``` # Note the "rb" argument with open("my_script.py", "rb") as data: my_script_content = data.read() # my_script_content can now be passed to the StaticContent constructor ``` """ config = serialize_length_delimited( DriverTaskConfig( staticContent=StaticContentConfig( content=content ) ) ) super().__init__( name, config=config, enclave_type="decentriq.driver", dependencies=dependencies, output_format=ComputeNodeFormat.RAW )
Create a node with the given name and content.
In case the source of the content is a file on your local machine, you can open the file in binary mode before reading it:
# Note the "rb" argument
with open("my_script.py", "rb") as data:
my_script_content = data.read()
# my_script_content can now be passed to the StaticContent constructor
Inherited Members
View Source
class Noop(Node): """ Computation node which does not perform any operation and produces an empty output. This is mostly used to allow users to test the execution of other computation nodes without giving access to the results. """ def __init__(self, name: str, dependencies: List[str] = []) -> None: config = serialize_length_delimited( DriverTaskConfig(noop=NoopConfig()) ) super().__init__( name, config, "decentriq.driver", dependencies, ComputeNodeFormat.RAW )
Computation node which does not perform any operation and produces an empty output. This is mostly used to allow users to test the execution of other computation nodes without giving access to the results.
View Source
def __init__(self, name: str, dependencies: List[str] = []) -> None: config = serialize_length_delimited( DriverTaskConfig(noop=NoopConfig()) ) super().__init__( name, config, "decentriq.driver", dependencies, ComputeNodeFormat.RAW )
Inherited Members
View Source
""" .. include:: ../../../decentriq_platform_docs/sql_getting_started.md --- """ __docformat__ = "restructuredtext" from .compute import SqlCompute, SqlSchemaVerifier from .proto.compute_sql_pb2 import ( PrimitiveType, ) from .helpers import ( TabularDataNodeBuilder, read_sql_query_result_as_string, read_input_csv_file, read_input_csv_string, upload_and_publish_tabular_dataset, ) __all__ = [ "SqlCompute", "TabularDataNodeBuilder", "PrimitiveType", "read_input_csv_file", "read_input_csv_string", "upload_and_publish_tabular_dataset", "read_sql_query_result_as_string", "SqlSchemaVerifier", ]
View Source
""" .. include:: ../../../decentriq_platform_docs/container_getting_started.md --- """ __docformat__ = "restructuredtext" from .compute import StaticContainerCompute from .helpers import read_result_as_zipfile __all__ = [ "StaticContainerCompute", "read_result_as_zipfile", ]
View Source
import chily import os from hashlib import sha256 from .proto import serialize_length_delimited from .proto import ChunkHeader, EncryptionHeader, VersionHeader from typing import BinaryIO, Iterator, Tuple, Optional from io import TextIOWrapper __all__ = ["Key"] KEY_LEN = 32 class Key(): """ This class wraps the key material that is used to encrypt the files that are uploaded to the decentriq platform. """ material: bytes def __init__(self, material: Optional[bytes] = None): """ Returns a new `Key` instance, can optional specify the raw key material. """ if material == None: key_bytes = os.urandom(KEY_LEN) else: if len(material) != KEY_LEN: raise Exception("Invalid key length, must be 32 bytes") key_bytes = material self.material = key_bytes def create_chunk_header(extra_entropy: bytes) -> bytes: chunk_header = ChunkHeader() chunk_header.extraEntropy = extra_entropy chunk_header_bytes = serialize_length_delimited(chunk_header) return chunk_header_bytes def create_version_header() -> bytes: version_header = VersionHeader() version_header.version = 0 return serialize_length_delimited(version_header) # Returns (integrity hash, encrypted blob) def create_encrypted_chunk( key: bytes, extra_entropy: bytes, data: bytes ) -> Tuple[bytes, bytes]: chunk_bytes = [] version_header = create_version_header() chunk_bytes.append(version_header) chunk_header = create_chunk_header(extra_entropy) chunk_bytes.append(chunk_header) chunk_bytes.append(data) chunk = b''.join(chunk_bytes) chunk_hasher = sha256() chunk_hasher.update(chunk) chunk_hash = chunk_hasher.digest() cipher = StorageCipher(key) encrypted_chunk = cipher.encrypt(chunk) return chunk_hash, encrypted_chunk class Chunker(Iterator): def __init__(self, input_stream: BinaryIO, chunk_size: int): self.chunk_size = chunk_size self.input_stream = input_stream def __iter__(self) -> Iterator[Tuple[bytes, bytes]]: self.input_stream.seek(0) return self # returns (hash, chunk) def __next__(self) -> Tuple[bytes, bytes]: version_header_bytes = create_version_header() chunk_header_bytes = create_chunk_header(os.urandom(16)) # Does not account for header size current_chunk_size = 0 chunk_bytes = [version_header_bytes, chunk_header_bytes] input_chunk_bytes = self.input_stream.read(self.chunk_size) if len(input_chunk_bytes) == 0: raise StopIteration chunk_bytes.append(input_chunk_bytes) chunk = b''.join(chunk_bytes) chunk_hasher = sha256() chunk_hasher.update(chunk) chunk_hash = chunk_hasher.digest() return chunk_hash, chunk class StorageCipher(): def __init__(self, symmetric_key: bytes): self.enc_key = symmetric_key self.cipher: chily.Cipher = chily.Cipher.from_symmetric(self.enc_key) def encrypt(self, data: bytes) -> bytes: nonce = chily.Nonce.from_random() encrypted_data = self.cipher.encrypt("storage cipher", data, nonce) encryption_header = EncryptionHeader() encryption_header.chilyKey.encryptionNonce = bytes(nonce.bytes) serialized_encryption_header = serialize_length_delimited(encryption_header) encrypted_data_with_header = bytes(list(serialized_encryption_header) + encrypted_data) return encrypted_data_with_header
View Source
from typing import Dict, List, Tuple from .types import EnclaveSpecification from .compute import GcgDriverDecoder from .sql.compute import SqlWorkerDecoder from .container.compute import ContainerWorkerDecoder from .proto import ( AttestationSpecification, AttestationSpecificationIntelEpid, AttestationSpecificationIntelDcap, AttestationSpecificationAwsNitro, ) import asn1crypto.pem from .certs import ( aws_nitro_root_ca_pem, intel_sgx_dcap_root_ca, intel_sgx_ias_root_ca ) intel_sgx_dcap_root_ca_der = asn1crypto.pem.unarmor(intel_sgx_dcap_root_ca)[2] intel_sgx_ias_root_ca_der = asn1crypto.pem.unarmor(intel_sgx_ias_root_ca)[2] aws_nitro_root_ca_der = asn1crypto.pem.unarmor(aws_nitro_root_ca_pem)[2] SPECIFICATIONS = { "decentriq.driver:v4": EnclaveSpecification( name="decentriq.driver", version="3", proto=AttestationSpecification( intelDcap=AttestationSpecificationIntelDcap( mrenclave=bytes.fromhex( "f3746dc7b06d7f1aa6fc1fc9dbf61920663466bc75476a9b1460f6a112be71cf" ), dcapRootCaDer=intel_sgx_dcap_root_ca_der, accept_debug=False, accept_out_of_date=False, accept_configuration_needed=False, accept_sw_hardening_needed=False, accept_revoked=False, ) ), workerProtocols=[0], decoder=GcgDriverDecoder(), clientProtocols=[1], ), "decentriq.driver:v3": EnclaveSpecification( name="decentriq.driver", version="3", proto=AttestationSpecification( intelDcap=AttestationSpecificationIntelDcap( mrenclave=bytes.fromhex( "564d4744604e252e046be2b2ba4d86fb7eb5a2ec85046735bd5abe62654b9d61" ), dcapRootCaDer=intel_sgx_dcap_root_ca_der, accept_debug=False, accept_out_of_date=False, accept_configuration_needed=False, accept_sw_hardening_needed=False, accept_revoked=False, ) ), workerProtocols=[0], decoder=GcgDriverDecoder(), clientProtocols=[1], ), "decentriq.driver:v2": EnclaveSpecification( name="decentriq.driver", version="2", proto=AttestationSpecification( intelDcap=AttestationSpecificationIntelDcap( mrenclave=bytes.fromhex( "a88ec2195974edf693e45b2ccdf39cf53c9382bb5309ba3863b5d0f2591542f1" ), dcapRootCaDer=intel_sgx_dcap_root_ca_der, accept_debug=False, accept_out_of_date=False, accept_configuration_needed=False, accept_sw_hardening_needed=False, accept_revoked=False, ) ), workerProtocols=[0], decoder=GcgDriverDecoder(), clientProtocols=[0], ), "decentriq.sql-worker:v4": EnclaveSpecification( name="decentriq.sql-worker", version="4", proto=AttestationSpecification( intelDcap=AttestationSpecificationIntelDcap( mrenclave=bytes.fromhex( "5f07de831d93b1ff5446ef0e17d8dcf0418ce8416cd0d5039c4266503fd4c7c9" ), dcapRootCaDer=intel_sgx_dcap_root_ca_der, accept_debug=False, accept_out_of_date=False, accept_configuration_needed=False, accept_sw_hardening_needed=False, accept_revoked=False, ) ), workerProtocols=[0], decoder=SqlWorkerDecoder() ), "decentriq.sql-worker:v3": EnclaveSpecification( name="decentriq.sql-worker", version="3", proto=AttestationSpecification( intelDcap=AttestationSpecificationIntelDcap( mrenclave=bytes.fromhex( "5b1838ec6d3509fe1c1ac1b13d394bc9df76057ddd18c1b8fff882b379d110e2" ), dcapRootCaDer=intel_sgx_dcap_root_ca_der, accept_debug=False, accept_out_of_date=False, accept_configuration_needed=False, accept_sw_hardening_needed=False, accept_revoked=False, ) ), workerProtocols=[0], decoder=SqlWorkerDecoder() ), "decentriq.sql-worker:v2": EnclaveSpecification( name="decentriq.sql-worker", version="2", proto=AttestationSpecification( intelDcap=AttestationSpecificationIntelDcap( mrenclave=bytes.fromhex( "c04e429edd5fc767e00ec4839b1f8c57375fda178905866ac6d4debee4fbe7d1" ), dcapRootCaDer=intel_sgx_dcap_root_ca_der, accept_debug=False, accept_out_of_date=False, accept_configuration_needed=False, accept_sw_hardening_needed=False, accept_revoked=False, ) ), workerProtocols=[0], decoder=SqlWorkerDecoder() ), "decentriq.python-ml-worker:v2": EnclaveSpecification( name="decentriq.python-ml-worker", version="2", proto=AttestationSpecification( awsNitro=AttestationSpecificationAwsNitro( nitroRootCaDer=aws_nitro_root_ca_der, pcr0=bytes.fromhex("ea004dce7a444fdf4603b903f9bc730494fc6c876ffd86fdf8f7c3910803b38ca60d0e38af0bbd44f159918d90fa46c4"), pcr1=bytes.fromhex("ea004dce7a444fdf4603b903f9bc730494fc6c876ffd86fdf8f7c3910803b38ca60d0e38af0bbd44f159918d90fa46c4"), pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"), pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") ) ), workerProtocols=[0], decoder=ContainerWorkerDecoder() ), "decentriq.python-ml-worker:v1": EnclaveSpecification( name="decentriq.python-ml-worker", version="1", proto=AttestationSpecification( awsNitro=AttestationSpecificationAwsNitro( nitroRootCaDer=aws_nitro_root_ca_der, pcr0=bytes.fromhex("9103b7019cd97a2837b1631648bc683b3470b402b3c0638d8ff16178c7fd031407c06aeff8f96517a320fb573d7d281f"), pcr1=bytes.fromhex("9103b7019cd97a2837b1631648bc683b3470b402b3c0638d8ff16178c7fd031407c06aeff8f96517a320fb573d7d281f"), pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"), pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") ) ), workerProtocols=[0], decoder=ContainerWorkerDecoder() ), "decentriq.python-synth-data-worker:v2": EnclaveSpecification( name="decentriq.python-synth-data-worker", version="2", proto=AttestationSpecification( awsNitro=AttestationSpecificationAwsNitro( nitroRootCaDer=aws_nitro_root_ca_der, pcr0=bytes.fromhex("3759aad200a137252997a459a38c1953c8868844c7271487ae536a441133c407fb2dd3e1d4b167d997a8aa48b54341c1"), pcr1=bytes.fromhex("3759aad200a137252997a459a38c1953c8868844c7271487ae536a441133c407fb2dd3e1d4b167d997a8aa48b54341c1"), pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"), pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") ) ), workerProtocols=[0], decoder=ContainerWorkerDecoder() ), "decentriq.python-synth-data-worker:v1": EnclaveSpecification( name="decentriq.python-synth-data-worker", version="1", proto=AttestationSpecification( awsNitro=AttestationSpecificationAwsNitro( nitroRootCaDer=aws_nitro_root_ca_der, pcr0=bytes.fromhex("b1eeccf3396b7d2163b7ff90017dfea71911b282e7ac683574237b0f6011d1617cb42322342e43d8721ccb9aefb918f4"), pcr1=bytes.fromhex("b1eeccf3396b7d2163b7ff90017dfea71911b282e7ac683574237b0f6011d1617cb42322342e43d8721ccb9aefb918f4"), pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"), pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") ) ), workerProtocols=[0], decoder=ContainerWorkerDecoder() ), "decentriq.r-latex-worker:v2": EnclaveSpecification( name="decentriq.r-latex-worker", version="2", proto=AttestationSpecification( awsNitro=AttestationSpecificationAwsNitro( nitroRootCaDer=aws_nitro_root_ca_der, pcr0=bytes.fromhex("43223aecfeae895fcb966b41618611f16d675991c472ad58d358a846bd9fa472d2eeabd130ad1aee97bf74067ccf5ac8"), pcr1=bytes.fromhex("43223aecfeae895fcb966b41618611f16d675991c472ad58d358a846bd9fa472d2eeabd130ad1aee97bf74067ccf5ac8"), pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"), pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") ) ), workerProtocols=[0], decoder=ContainerWorkerDecoder() ), "decentriq.r-latex-worker:v1": EnclaveSpecification( name="decentriq.r-latex-worker", version="1", proto=AttestationSpecification( awsNitro=AttestationSpecificationAwsNitro( nitroRootCaDer=aws_nitro_root_ca_der, pcr0=bytes.fromhex("7e78d613db8801fe051abc1325eb1dfd12a42b3c8cd9ba6925c35d2bc91549c79b62577c1fd799bedc8ed8a3f85422e6"), pcr1=bytes.fromhex("7e78d613db8801fe051abc1325eb1dfd12a42b3c8cd9ba6925c35d2bc91549c79b62577c1fd799bedc8ed8a3f85422e6"), pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"), pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") ) ), workerProtocols=[0], decoder=ContainerWorkerDecoder() ) } class EnclaveSpecifications: """ Provider of the available enclave specifications provided by the Decentriq platform. Enclave specifications enable you to express which particular enclaves you trust. The field containing the measurement (e.g. `mrenclave` in the case of Intel SGX) identifies the exact binary that will process your data. Users of the Decentriq platform are encouraged to reproduce this value by building the enclave binary from audited source code and re-producing the measurement (in the case of Intel SGX, this would involve simply hashing the produced executable). When connecting to the driver enclave, the configured attestation algorithm will guarantee that the enclave you connect to is the one corresponding to the enclave specification you chose. The associated root certificate will be used to verify that the attestation was signed by the expected party (e.g. Intel/AMD/Amazon, depending on the CC technology used). Any communication between the driver enclave and worker enclaves handling your data will also first be secured by additional attestation procedures. Which enclaves are trusted by the driver enclave is controlled by choosing the additional enclave specs from the respective compute packages. A list of enclave specifications, each encoding your trust in a particular enclave type, can be obtained by selecting a subset of the enclave specifications provided by the object `decentriq_platform.enclave_specifications`. Selecting the subset of versions should be done by calling its `versions` method. """ def __init__(self, specifications: Dict[str, EnclaveSpecification]): self.specifications = specifications def list(self) -> List[str]: """Get a list of all available enclave identifiers.""" return sorted(self.specifications.keys()) def versions(self, enclave_versions: List[str]) -> Dict[str, EnclaveSpecification]: """ Get the enclave specifications for the given versioned enclave types. Make sure to always include the specification of a *driver enclave*, e.g. `"decentriq.driver:v1"` as this is the node with which you communicate directly. Add additional versioned enclaves depending on the compute module you use. Refer to the main documentation page of each compute module to learn which enclaves are available. """ selected_specifcations = {} for version in enclave_versions: enclave_type = version.split(":")[0] selected_specifcations[enclave_type] = self.specifications[version] return selected_specifcations def all(self) -> List[EnclaveSpecification]: """Get a list of all available enclave specifications.""" return list(self.specifications.values()) def merge(self, other): """ Merge two sets of enclave specifications into a single set. """ return EnclaveSpecifications({**self.specifications, **other.specifications}) enclave_specifications: EnclaveSpecifications = EnclaveSpecifications(SPECIFICATIONS) """The main catalogue of enclave specifications available within the Decentriq platform."""
View Source
from google.protobuf.json_format import MessageToDict from .proto import AttestationSpecification, ComputeNodeProtocol, DriverTaskConfig from typing import List, Dict, Optional, Any from typing_extensions import TypedDict from enum import Enum from .proto.length_delimited import parse_length_delimited from .proto.compute_sql_pb2 import SqlWorkerConfiguration from .container.proto.compute_container_pb2 import ContainerWorkerConfiguration __all__ = [ "EnclaveSpecification", "JobId", "DataRoomDescription", "DatasetDescription", ] class JobId: """ Class for identifying running or already run jobs. Objects of this class can be used to retrieve results for processed computations. """ def __init__(self, job_id: str, compute_node_id: str): self.id = job_id """The identifier of the job that processed a particular computation.""" self.compute_node_id = compute_node_id """The id of the computation that was processed.""" class ScopeTypes(str, Enum): USER_FILE = "user_file", DATA_ROOM_INTERMEDIATE_DATA = "dataroom_intermediate_data" DATA_ROOM_COMMITS_DATA = "dataroom_commits_data" class UserResponse(TypedDict): id: str email: str class UserCsrRequest(TypedDict): csrPem: str class UserCsrResponse(TypedDict): certChainPem: str class SystemCaResponse(TypedDict): rootCertificate: str class CreateSessionRequest(TypedDict): attestationSpecificationHash: str class SessionJsonResponse(TypedDict): sessionId: str attestationSpecificationHash: str class FinalizeUpload(TypedDict): uploadId: str manifest: str name: str manifestHash: str chunks: List[str] scopeId: str class ChunkWrapper(TypedDict): hash: str data: str class UploadDescription(TypedDict): uploadId: str class ChunkDescription(TypedDict): chunkHash: str class DataRoomDescription(TypedDict): dataRoomId: str name: str description: str mrenclave: str ownerEmail: str creationDate: str status: str class DatasetDescription(TypedDict): """ This class includes information about an uploaded dataset """ datasetId: str """ The data set id as a hex-encoded string. This id is also called the manifest hash. """ name: str """The name of this dataset""" description: str """An optional description""" ownerEmail: str """The original uploader of the dataset""" creationDate: str class SignatureResponse(TypedDict): type: str data: List[int] class EnclaveMessage(TypedDict): data: str class FatquoteResBody(TypedDict): fatquoteBase64: str class DatasetManifestMetadata(TypedDict): name: str manifestHash: str chunks: List[str] class EnclaveSpecification(TypedDict): """ This class includes information about an enclave deployed in the platform. Please refer to `decentriq_platform.EnclaveSpecifications` for a detailed explanation. """ name: str """The name of the enclave.""" version: str """The version of the enclave.""" proto: AttestationSpecification """The Protobuf object.""" workerProtocols: List[int] """The worker protocol versions supported by the node""" decoder: Optional[Any] """ Decoder object that can be used to decode the binary configs belonging to enclaves of this type. """ clientProtocols: Optional[List[int]] """The client protocol versions supported by the node""" class CreateScopeRequest(TypedDict): metadata: Dict[str, str] class ScopeJson(TypedDict): scopeId: str metadata: Dict[str, str] class EnclaveSpecificationJson(TypedDict): name: str version: str spec: str class EnclaveSpecificationResponse(TypedDict): attestationSpecs: List[EnclaveSpecificationJson] class Tcb(TypedDict): sgxtcbcomp01svn: int sgxtcbcomp02svn: int sgxtcbcomp03svn: int sgxtcbcomp04svn: int sgxtcbcomp05svn: int sgxtcbcomp06svn: int sgxtcbcomp07svn: int sgxtcbcomp08svn: int sgxtcbcomp09svn: int sgxtcbcomp10svn: int sgxtcbcomp11svn: int sgxtcbcomp12svn: int sgxtcbcomp13svn: int sgxtcbcomp14svn: int sgxtcbcomp15svn: int sgxtcbcomp16svn: int pcesvn: int class TcbLevel(TypedDict): tcb: Tcb tcbStatus: str class TcbInfo(TypedDict): version: int issueDate: str nextUpdate: str fmspc: str pceId: str tcbType: int tcbEvaluationDataNumber: int tcbLevels: List[TcbLevel] class TcbInfoContainer(TypedDict): tcbInfo: TcbInfo signature: str class IasResponse(TypedDict): isvEnclaveQuoteBody: str isvEnclaveQuoteStatus: str
View Source
from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding, rsa __all__ = ["Auth"] PKey = rsa.RSAPrivateKey; class Auth: """ This class wraps the certificate used to identify a user and implements the signing of the messages that are sent to the enclave """ def __init__( self, certificate_chain: bytes, keypair: PKey, user_id: str, ): """ Create an authentication object with the supplied certificate chain and keypair. To use the identity provider of the decentriq platform use `decentriq_platform.Client.create_auth` """ self.certificate_chain = certificate_chain self.kp = keypair self.user_id = user_id def _get_user_id(self) -> str: return self.user_id def _sign(self, data: bytes) -> bytes: return self.kp.sign(data, padding.PKCS1v15(), hashes.SHA512()) def get_certificate_chain_pem(self) -> bytes: """ Returns the chain of certificates in PEM format """ return self.certificate_chain class Sigma: def __init__(self, signature: bytes, mac_tag: bytes, auth_pki: Auth): self.signature: bytes = signature self.mac_tag: bytes = mac_tag self.auth_pki: Auth = auth_pki def get_mac_tag(self) -> bytes: return self.mac_tag def get_signature(self) -> bytes: return self.signature def get_cert_chain(self) -> bytes: return self.auth_pki.get_certificate_chain_pem() def generate_key(bit_size: int = 4096) -> PKey: key = rsa.generate_private_key( public_exponent=65537, key_size=bit_size, ) return key def generate_csr(user_email: str, key: PKey) -> bytes: csr_builder = x509.CertificateSigningRequestBuilder() csr = csr_builder.subject_name(x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, user_email) ])).add_extension( x509.BasicConstraints(ca=False, path_length=None), critical=True ).add_extension( x509.KeyUsage( digital_signature=True, content_commitment=True, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False ), critical=True ).sign(key, hashes.SHA512()) return csr.public_bytes(serialization.Encoding.PEM)
View Source
from typing import List, Optional, Tuple import datetime import base64 import json import uuid from .authentication import generate_csr, generate_key, Auth from .types import ( DataRoomDescription, DatasetDescription, UserCsrResponse, UserCsrRequest, ) from .api import API, Endpoints from .proto import ( AuthenticationMethod, TrustedPki, DataRoom, DataRoomStatus, ConfigurationModification ) from .config import ( DECENTRIQ_CLIENT_ID, DECENTRIQ_API_PLATFORM_HOST, DECENTRIQ_API_PLATFORM_PORT, DECENTRIQ_API_PLATFORM_USE_TLS ) __all__ = [ "ClientPlatformFeatures", "SessionPlatformFeatures" ] def platform_hash_from_str(s: str) -> bytes: """ The reverse operation of `platform_hash_to_str`. """ list_u8 = [int(x) for x in base64.b64decode(s).decode('ascii').split(',')] return bytes(list_u8) def platform_hash_to_str(bs: bytes) -> str: """ Transform a bytestring received from the The platform stores hashes (both files and data rooms) in the following form: Given a series of bytes with values 1 2 3, it will store the ascii codes of the string '1,2,3' (including the commas), so we get the byte array 49 44 50 44 51. This is then base64 encoded and sent to the backend where it is base64 decoded. """ return base64.b64encode(','.join([str(b) for b in bs]).encode('ascii')).decode('ascii') _GET_DATASET_LINKS_QUERY = """ query getDatasetLinks($filter: DatasetLinkFilter!) { datasetLinks(filter: $filter) { nodes { datasetLinkUuid: datasetLinkId computeNode { computeNodeUuid: computeNodeId nodeName dataRoom { dataRoomUuid: dataRoomId dataRoomId: dataRoomHashEncoded } } dataset { name datasetId: datasetHashEncoded } } } } """ class PlatformApi: """ Class to interact with the GraphQL endpoint of the Decentriq platform. This class enables the integration of the Python SDK with the browser-based Decentriq platform UI by providing the necessary CRUD functions. """ user_email: str _http_api: API def __init__(self, user_email: str, http_api: API): self.user_email = user_email self._http_api = http_api def get_data_room_descriptions(self) -> List[DataRoomDescription]: data = self._post_graphql( """ query getDataRooms($filter: DataRoomFilter) { dataRooms(filter: $filter, orderBy: NAME_ASC) { nodes { dataRoomId: dataRoomHashEncoded name description mrenclave ownerEmail creationDate: createdAt state { status } } } } """ ) return self._parse_get_data_rooms_response(data) def update_data_room_status( self, data_room_hash: str, status # type: DataRoomStatus.V ): data_room = self.get_data_room_by_hash(data_room_hash) if not data_room: raise Exception(f"Unable to find data room with hash '{data_room_hash}'") data_room_uuid = data_room["dataRoomUuid"] current_datetime_str = datetime.datetime.now().isoformat() self._post_graphql( """ mutation upsertState($input: UpsertStateInput!) { upsertState(input: $input) { state { id status statusUpdatedAt updatedAt updatedByEmail } } } """, { "input": { "clientMutationId": str(uuid.uuid4()), "state": { "dataRoomId": data_room_uuid, "status": DataRoomStatus.Name(status).upper(), "statusUpdatedAt": current_datetime_str, } } } ) def get_dataset_link( self, data_room_id: str, leaf_id: str, ) -> Optional[dict]: data_room = self.get_data_room_by_hash(data_room_id) if not data_room: raise Exception(f"Unable to get data room for hash '{data_room_id}'") data_room_uuid = data_room["dataRoomUuid"] compute_node = self._get_compute_node(data_room_uuid, leaf_id) if compute_node: compute_node_uuid = compute_node["computeNodeUuid"] return self._get_dataset_link(compute_node_uuid) else: raise Exception( f"Unable to find leaf with name '{leaf_id}' for data room '{data_room_id}'" ) def get_dataset_links_for_manifest_hash( self, manifest_hash: str, ) -> Optional[List[str]]: links = self._get_dataset_links_for_manifest_hash(manifest_hash) return [link["datasetLinkUuid"] for link in links] def _get_dataset_links_for_manifest_hash( self, manifest_hash: str, ) -> List[dict]: data = self._post_graphql( _GET_DATASET_LINKS_QUERY, { "filter": { "datasetHashEncoded": {"equalTo": manifest_hash}, } } ) return data["datasetLinks"]["nodes"] def _get_dataset_link(self, compute_node_uuid: str) -> Optional[dict]: data = self._post_graphql( _GET_DATASET_LINKS_QUERY, { "filter": { "computeNode": { "computeNodeId": {"equalTo": compute_node_uuid} } } } ) nodes = data["datasetLinks"]["nodes"] if nodes: return nodes[0] else: return None def get_data_rooms_with_published_dataset(self, manifest_hash) -> List[str]: links = self._get_dataset_links_for_manifest_hash(manifest_hash) data_room_ids = [] for link in links: if "computeNode" in link and "dataRoom" in link["computeNode"]: data_room_id = link["computeNode"]["dataRoom"].get("dataRoomId") if data_room_id: data_room_ids.append(data_room_id) return data_room_ids def delete_dataset_link( self, data_room_id: str, leaf_id: str, ) -> Optional[dict]: dataset_link = self.get_dataset_link(data_room_id, leaf_id) if not dataset_link: raise Exception( f"Unable to find a dataset link for data room '{data_room_id}'" + f" and data node '{leaf_id}'" ) else: self._post_graphql( """ mutation deleteDatasetLink($input: DeleteDatasetLinkInput!) { deleteDatasetLink(input: $input) { clientMutationId } } """, { "input": { "clientMutationId": str(uuid.uuid4()), "datasetLinkId": dataset_link["datasetLinkUuid"] } } ) def delete_dataset_links_for_manifest_hash( self, manifest_hash: str, ) -> Optional[dict]: uuids = self.get_dataset_links_for_manifest_hash(manifest_hash) if uuids: for link_id in uuids: self._post_graphql( """ mutation deleteDatasetLink($input: DeleteDatasetLinkInput!) { deleteDatasetLink(input: $input) { clientMutationId } } """, { "input": { "clientMutationId": str(uuid.uuid4()), "datasetLinkId": link_id } } ) def _delete_data_room(self, data_room_uuid: str) -> str: data = self._post_graphql( """ mutation deleteDataRoom($input: DeleteDataRoomByIdInput!) { deleteDataRoomById(input: $input) { clientMutationId dataRoom { id } } } """, { "input": { "clientMutationId": str(uuid.uuid4()), "id": data_room_uuid, } } ) deleted_id = data.get("deleteDataRoomById", {}).get("dataRoom", {}).get("id") if deleted_id is None: raise Exception( "Received malformed response when trying to delete DCR in backend" ) else: return data_room_uuid def publish_data_room( self, data_room_definition: Tuple[DataRoom, List[ConfigurationModification]], data_room_hash: str, attestation_specification_hash: str, additional_fields: dict = {}, ): data_room, conf_modifications = data_room_definition owner_email = data_room.ownerEmail participant_emails = [ op.add.element.userPermission.email for op in conf_modifications if op.HasField("add") and op.add.element.HasField("userPermission") ] if owner_email in participant_emails: participant_emails.remove(owner_email) user_permissions_input = [] for participant_email in participant_emails: user_permissions_input.append({ "email": participant_email, }) data = self._post_graphql( """ mutation createDataRoom($input: CreateDataRoomInput!) { createDataRoom(input: $input) { dataRoom { dataRoomUuid: dataRoomId dataRoomHashEncoded lock { isLocked } } } } """, { "input": { "clientMutationId": str(uuid.uuid4()), "dataRoom": { "dataRoomHash": platform_hash_to_str(bytes.fromhex(data_room_hash)), "dataRoomHashEncoded": data_room_hash, "name": data_room.name, "description": data_room.description, "mrenclave": attestation_specification_hash, "source": "PYTHON", "ownerEmail": data_room.ownerEmail, "userPermissions": { "create": user_permissions_input }, **additional_fields, "lock": { "create": { "isLocked": True } } } } } ) data_room_uuid = data.get("createDataRoom", {}).get("dataRoom", {}).get("dataRoomUuid") if data_room_uuid is None: raise Exception( "Received malformed response when trying to create DCR in backend" ) def _parse_get_data_rooms_response(self, data) -> List[DataRoomDescription]: remove_keys = set(["state"]) def payload_to_dcr_description(d): # A DCR without a state should be displayed as a DCR with "Active" status # Uppercase status such as "STOPPED" should be displayed as "Stopped" to match # the proto version. if d.get("state") and d["state"].get("status"): status = d["state"]["status"].capitalize() else: status = "Active" d_cleaned = {k: v for k, v in d.items() if k not in remove_keys} return DataRoomDescription(status=status, **d_cleaned) dcr_dicts = data.get("dataRooms", {}).get("nodes", []) dcr_descriptions = [payload_to_dcr_description(d) for d in dcr_dicts] # Remove non-published DCRs from list return [desc for desc in dcr_descriptions if desc["dataRoomId"]] def get_data_room_by_hash(self, data_room_hash: str) -> Optional[dict]: data = self._post_graphql( """ query getDataRoomByHash($dataRoomHashEncoded: String!) { dataRooms(condition: {dataRoomHashEncoded: $dataRoomHashEncoded}) { nodes { dataRoomUuid: dataRoomId source } } } """, { "dataRoomHashEncoded": data_room_hash } ) entries = data.get("dataRooms", {}).get("nodes", []) if len(entries) > 1: raise Exception("Cannot have multiple DCRs with the same hashcode") elif len(entries) == 0: return None else: return entries[0] def get_datasets_of_user(self) -> List[DatasetDescription]: data = self._post_graphql( """ query getDatasets { datasets { nodes { datasetId: datasetHashEncoded name description ownerEmail creationDate: createdAt datasetMeta { description } } } } """ ) nodes = data.get("datasets", {}).get("nodes", []) datasets = [] for node in nodes: meta_info = node.get("datasetMeta") if meta_info: description = meta_info.get("description") else: description = None datasets.append( DatasetDescription( datasetId=node["datasetId"], name=node["name"], description=description, ownerEmail=node["ownerEmail"], creationDate=node["creationDate"] ) ) return datasets def save_dataset_metadata( self, manifest_hash: str, file_name: str, description: str, owner_email: str ): self._post_graphql( """ mutation createDatasetMeta($input: CreateDatasetMetaInput!) { createDatasetMeta(input: $input) { datasetMeta { datasetHashEncoded name description } } } """, { "input": { "clientMutationId": str(uuid.uuid4()), "datasetMeta": { "datasetHash": platform_hash_to_str(bytes.fromhex(manifest_hash)), "name": file_name, "description": description, "ownerEmail": owner_email, } } } ) def delete_dataset_metadata(self, manifest_hash: str): self._post_graphql( """ mutation deleteDatasetMeta($datasetHashEncoded: String!) { deleteDatasetMetaByDatasetHashEncoded(input: {datasetHashEncoded: $datasetHashEncoded}) { datasetMeta { id } } } """, { "datasetHashEncoded": manifest_hash } ) def create_dataset_link( self, data_room_id: str, manifest_hash: str, leaf_id: str ): data_room = self.get_data_room_by_hash(data_room_id) if not data_room: raise Exception(f"Unable to get data room id for hash {data_room_id}") else: if data_room["source"] == "WEB": data_room_uuid = data_room["dataRoomUuid"] compute_node = self._get_compute_node(data_room_uuid, leaf_id) if compute_node: compute_node_uuid = compute_node["computeNodeUuid"] # Can link to both compute nodes (type BRANCH) and leaf nodes (type LEAF). # For SQL nodes we need to link to the verifier compute node. if compute_node_uuid: existing_link = self._get_dataset_link(compute_node_uuid) if existing_link: dataset_hash = existing_link["dataset"]["datasetId"] raise Exception( "The following dataset has already been published for this node." + " Please unpublish this dataset first." + f" Dataset: '{dataset_hash}'" ) self._post_graphql( """ mutation createDatasetLink($input: CreateDatasetLinkInput!) { createDatasetLink(input: $input) { clientMutationId datasetLink { datasetHashEncoded } } } """, { "input": { "clientMutationId": str(uuid.uuid4()), "datasetLink": { "datasetHash": platform_hash_to_str( bytes.fromhex(manifest_hash) ), "computeNodeId": compute_node_uuid, } } } ) else: raise Exception( f"Unable to find leaf with name '{leaf_id}' for data room '{data_room_id}'" ) else: pass def get_datasets_by_ids( self, manifest_hashes: List[str] ) -> List[DatasetDescription]: """ Returns the a list of datasets with the given ids. """ data = self._post_graphql( """ query getDatasets($filter: DatasetMetaFilter!) { datasetMetas(filter: $filter) { nodes { datasetId: datasetHashEncoded name description ownerEmail creationDate: createdAt } } } """, { "filter": { "datasetHashEncoded": { "in": manifest_hashes } } } ) nodes = data.get("datasetMetas", {}).get("nodes", []) return [DatasetDescription(**node) for node in nodes] def _get_compute_node(self, data_room_uuid: str, leaf_id: str) -> Optional[dict]: data = self._post_graphql( """ query getComputeNodes($filter: ComputeNodeFilter!) { computeNodes(filter: $filter) { nodes { computeNodeUuid: computeNodeId nodeName computeNodeType } } } """, { "filter": { "dataRoomId": { "equalTo": data_room_uuid }, "nodeName": { "equalTo": leaf_id }, } } ) nodes = data["computeNodes"]["nodes"] if nodes and len(nodes) == 1: return nodes[0] else: return None def _post_graphql(self, query: str, variables={}) -> dict: request_payload = { "query": query } if variables: request_payload["variables"] = variables response = self._http_api.post( "", json.dumps(request_payload), {"Content-type": "application/json"} ) payload = response.json() if "errors" in payload: error_messages = [] for message in payload["errors"]: decoded_message =\ base64.b64decode(message["message"]).decode('utf-8') error_messages.append(decoded_message) raise Exception(",".join(error_messages)) elif "data" in payload: return payload["data"] else: raise Exception("Malformed GraphQL response: no 'data' or 'errors' key") def create_platform_api( api_token: str, user_email: str, client_id: str = DECENTRIQ_CLIENT_ID, api_host: str = DECENTRIQ_API_PLATFORM_HOST, api_port: int = DECENTRIQ_API_PLATFORM_PORT, api_use_tls: bool = DECENTRIQ_API_PLATFORM_USE_TLS ): http_api = API( api_token, client_id, api_host, api_port, api_prefix="/api/decentriq-platform/graphql", use_tls=api_use_tls, additional_auth_headers={ "Authorization-User-Email": user_email, } ) return PlatformApi(user_email, http_api) class ClientPlatformFeatures: """ Provider of a list of methods and properties mirroring what is offered by the Decentriq web platform. """ _http_api: API _platform_api: PlatformApi def __init__( self, api_token: str, user_email: str, http_api: API, client_id: str = DECENTRIQ_CLIENT_ID, api_host: str = DECENTRIQ_API_PLATFORM_HOST, api_port: int = DECENTRIQ_API_PLATFORM_PORT, api_use_tls: bool = DECENTRIQ_API_PLATFORM_USE_TLS, ): """ Creating objects of this class directly is not necessary as an object of this class is provided as part of each `Client` object. """ self._http_api = http_api self._platform_api = create_platform_api(api_token, user_email, client_id, api_host, api_port, api_use_tls) self.user_email = user_email @property def decentriq_ca_root_certificate(self) -> bytes: """ Returns the root certificate used by the Decentriq identity provider. Note that when using this certificate in any authentication scheme, you trust Decentriq as an identity provider! """ url = Endpoints.SYSTEM_CERTIFICATE_AUTHORITY response = self._http_api.get(url).json() return response["rootCertificate"].encode("utf-8") @property def decentriq_pki_authentication(self) -> AuthenticationMethod: """ The authentication method that uses the Decentriq root certificate to authenticate users. This method should be specified when building a data room in case you want to interact with the that data room either via the web interface or with sessions created using `create_auth_using_decentriq_pki`. Note that when using this authentication method you trust Decentriq as an identity provider! You can also create an `AuthenticationMethod` object directly and supply your own root certificate, with which to authenticate users connecting to your data room. In this case you will also need to issue corresponding user certificates and create your own custom `decentriq_platform.authentication.Auth` objects. """ root_pki = self.decentriq_ca_root_certificate return AuthenticationMethod( trustedPki=TrustedPki(rootCertificatePem=root_pki) ) def create_auth_using_decentriq_pki(self, email: str = None) -> Auth: """ Creates a `decentriq_platform.authentication.Auth` object which can be attached to a `decentriq_platform.session.Session`. Sessions created using such an `Auth` object will commonly be used with data rooms that have been configured to use the `decentriq_pki_authentication` authentication method. """ email = email if email is not None else self.user_email keypair = generate_key() csr = generate_csr(email, keypair) url = Endpoints.USER_CERTIFICATE.replace(":userId", email) csr_req = UserCsrRequest(csrPem=csr.decode("utf-8")) resp: UserCsrResponse = self._http_api.post(url, req_body=json.dumps(csr_req)).json() cert_chain_pem = resp["certChainPem"].encode("utf-8") auth = Auth(cert_chain_pem, keypair, email) return auth def get_data_room_descriptions(self) -> List[DataRoomDescription]: """ Returns the a list of descriptions of all the data rooms a user created or participates in. """ return self._platform_api.get_data_room_descriptions() def get_available_datasets(self) -> List[DatasetDescription]: """ Returns the a list of datasets that the current user uploaded, regardless of whether they have already been connected to a data room or not. """ return self._platform_api.get_datasets_of_user() class SessionPlatformFeatures: """ A provider for methods that mirror functionality known from the Decentriq web platform. This class is similar to `ClientPlatformFeatures` but methods provided on this class require communication with an enclave. """ _platform_api: PlatformApi def __init__(self, session, api: PlatformApi): """ Instances of this class should not be created directly but rather be accessed via the `Session.platform` field. """ self._session = session self._platform_api = api def get_published_datasets( self, data_room_id: str ) -> List[DatasetDescription]: """ Get a list of all the datasets that were published for a given data room. """ response = self._session.retrieve_published_datasets(data_room_id) manifest_hashes =\ [dataset.datasetHash.hex() for dataset in response.publishedDatasets] return self._platform_api.get_datasets_by_ids(manifest_hashes)
View Source
from __future__ import annotations import chily import re import hashlib import hmac import json from base64 import b64encode, b64decode from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives import hashes from typing import Any, List, Tuple, TYPE_CHECKING, Iterator, Optional, Dict from time import sleep from .platform import PlatformApi from .api import Endpoints from .authentication import Auth, Sigma from .proto import ( DataRoom, ComputeNodeProtocol, DataNoncePubkey, Request, Response, CreateDataRoomRequest, CreateDataRoomResponse, ExecuteComputeRequest, ExecuteComputeResponse, GcgRequest, GcgResponse, GetResultsRequest, GetResultsResponseChunk, JobStatusRequest, JobStatusResponse, PublishDatasetToDataRoomRequest, PublishDatasetToDataRoomResponse, RemovePublishedDatasetRequest, RemovePublishedDatasetResponse, RetrieveAuditLogRequest, RetrieveAuditLogResponse, RetrieveDataRoomRequest, RetrieveDataRoomResponse, RetrieveDataRoomStatusRequest, RetrievePublishedDatasetsRequest, RetrievePublishedDatasetsResponse, UpdateDataRoomStatusRequest, ExecuteDevelopmentComputeRequest, CreateConfigurationCommitRequest, GenerateMergeApprovalSignatureRequest, MergeConfigurationCommitRequest, RetrieveDataRoomConfigurationHistoryRequest, RetrieveDataRoomConfigurationHistoryResponse, UpdateDataRoomStatusResponse, DataRoomStatus, AttestationSpecification, Fatquote, CreateConfigurationCommitRequest, RetrieveConfigurationCommitApproversRequest, RetrieveConfigurationCommitRequest ) from .proto.length_delimited import parse_length_delimited, serialize_length_delimited from .storage import Key from .types import FatquoteResBody, EnclaveMessage, ScopeTypes, JobId from .verification import QuoteBody, Verification from .platform import SessionPlatformFeatures if TYPE_CHECKING: from .client import Client __all__ = [ "Session", "LATEST_GCG_PROTOCOL_VERSION", "LATEST_WORKER_PROTOCOL_VERSION" ] LATEST_GCG_PROTOCOL_VERSION = 1 LATEST_WORKER_PROTOCOL_VERSION = 0 def _get_data_room_id(create_data_room_response: CreateDataRoomResponse) -> bytes: response_type = create_data_room_response.WhichOneof("create_data_room_response") if response_type is None: raise Exception("Empty CreateDataRoomResponse") elif response_type == "dataRoomId": return create_data_room_response.dataRoomId elif response_type == "dataRoomValidationError": raise Exception( "DataRoom creation failed", create_data_room_response.dataRoomValidationError ) else: raise Exception("Unknown response type for CreateDataRoomResponse", response_type) def datanoncepubkey_to_message( encrypted_data: bytes, nonce: bytes, pubkey: bytes, sigma_auth: Sigma ) -> DataNoncePubkey: message = DataNoncePubkey() message.data = encrypted_data message.nonce = nonce message.pubkey = pubkey message.pki.certChainPem = sigma_auth.get_cert_chain() message.pki.signature = sigma_auth.get_signature() message.pki.idMac = sigma_auth.get_mac_tag() return message def message_to_datanoncepubkey(message: bytes) -> Tuple[bytes, bytes, bytes]: parsed_msg = DataNoncePubkey() parse_length_delimited(message, parsed_msg) return (parsed_msg.data, parsed_msg.nonce, parsed_msg.pubkey) class Session(): """ Class for managing the communication with an enclave. """ client: Client session_id: str enclave_identifier: str auth: Auth email: str keypair: Any fatquote: Fatquote quote: QuoteBody driver_attestation_specification: AttestationSpecification client_protocols: List[int] def __init__( self, client: Client, session_id: str, driver_attestation_specification: AttestationSpecification, client_protocols: List[int], auth: Auth, platform_api: Optional[PlatformApi] = None, ): """ `Session` instances should not be instantiated directly but rather be created using a `Client` object using `decentriq_platform.Client.create_session`. """ url = Endpoints.SESSION_FATQUOTE.replace(":sessionId", session_id) response: FatquoteResBody = client._api.get(url).json() fatquote_bytes = b64decode(response["fatquoteBase64"]) fatquote = Fatquote() fatquote.ParseFromString(fatquote_bytes) verification = Verification(attestation_specification=driver_attestation_specification) report_data = verification.verify(fatquote) self.client = client self.session_id = session_id self.auth = auth self.email = auth.user_id self.keypair = chily.Keypair.from_random() self.fatquote = fatquote self.report_data = report_data self.driver_attestation_specification = driver_attestation_specification self.client_protocols = client_protocols if platform_api: self._platform = SessionPlatformFeatures(self, platform_api) else: self._platform = None def _get_client_protocol(self, endpoint_protocols: List[int]) -> int: try: protocol = max(set(self.client_protocols) & set(endpoint_protocols)) return protocol except ValueError: min_enclave_version = min(self.client_protocols) max_endpoint_version = min(endpoint_protocols) exception_message = "Endpoint is only available with protocol versions {} but the enclave only supports {}\n".format( endpoint_protocols, self.client_protocols ) if min_enclave_version > max_endpoint_version: exception_message += "Try upgrading to a newer version of the SDK" else: exception_message += "Try using an older version of the SDK" raise Exception(exception_message) def _get_enclave_pubkey(self): pub_keyB = bytearray(self.report_data[:32]) return chily.PublicKey.from_bytes(pub_keyB) def _encrypt_and_encode_data(self, data: bytes, auth: Auth) -> bytes: nonce = chily.Nonce.from_random() cipher = chily.Cipher( self.keypair.secret, self._get_enclave_pubkey() ) enc_data = cipher.encrypt("client sent session data", data, nonce) public_keys = bytes(self.keypair.public_key.bytes) + bytes(self._get_enclave_pubkey().bytes) signature = auth._sign(public_keys) shared_key = bytes(self.keypair.secret.diffie_hellman(self._get_enclave_pubkey()).bytes) hkdf = HKDF(algorithm=hashes.SHA512(), length=64, info=b"IdP KDF Context", salt=b"") mac_key = hkdf.derive(shared_key) mac_tag = hmac.digest(mac_key, auth._get_user_id().encode(), "sha512") sigma_auth = Sigma(signature, mac_tag, auth) return datanoncepubkey_to_message( bytes(enc_data), bytes(nonce.bytes), bytes(self.keypair.public_key.bytes), sigma_auth ) def _decode_and_decrypt_data(self, data: bytes) -> bytes: dec_data, nonceB, _ = message_to_datanoncepubkey(data) cipher = chily.Cipher( self.keypair.secret, self._get_enclave_pubkey() ) return cipher.decrypt("client received session data", dec_data, chily.Nonce.from_bytes(nonceB)) def send_request( self, request: GcgRequest, protocol: int, ) -> List[GcgResponse]: """ Low-level method for sending a raw `GcgRequest` to the enclave. Use this method if any of the convenience methods (such as `run_computation`) don't perform the exact task you want. """ gcg_protocol = serialize_length_delimited( ComputeNodeProtocol( version=protocol ) ) serialized_request = serialize_length_delimited( Request( deltaRequest= self._encrypt_and_encode_data( gcg_protocol + serialize_length_delimited(request), self.auth ) ) ) url = Endpoints.SESSION_MESSAGES.replace(":sessionId", self.session_id) enclave_request = EnclaveMessage(data=b64encode(serialized_request).decode("ascii")) enclave_response: bytes = self.client._api.post( url, json.dumps(enclave_request), {"Content-type": "application/json", "Accept-Version": "2"} ).content responses: List[GcgResponse] = [] offset = 0 while offset < len(enclave_response): response_container = Response() offset += parse_length_delimited(enclave_response[offset:], response_container) if response_container.HasField("unsuccessfulResponse"): raise Exception(response_container.unsuccessfulResponse) else: response = GcgResponse() decrypted_response = self._decode_and_decrypt_data( response_container.successfulResponse ) response_protocol = ComputeNodeProtocol() response_offset = parse_length_delimited( decrypted_response, response_protocol ) if response_protocol.version != protocol: raise Exception("Different response protocol version than requested") parse_length_delimited(decrypted_response[response_offset:], response) if response.HasField("failure"): raise Exception(response.failure) responses.append(response) return responses def _publish_data_room( self, data_room_definition: Tuple[DataRoom, List[ConfigurationModification]], ) -> CreateDataRoomResponse: endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) data_room, conf_modifications = data_room_definition if not data_room.ownerEmail: data_room.ownerEmail = self.email request = CreateDataRoomRequest( dataRoom=data_room, initialConfiguration=conf_modifications, ) responses = self.send_request(GcgRequest(createDataRoomRequest=request), protocol) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if response.HasField("createDataRoomResponse"): if response.createDataRoomResponse.HasField("dataRoomValidationError"): raise Exception( "Error when validating data room: {} (compute node id '{}')".format( response.createDataRoomResponse.dataRoomValidationError.message, response.createDataRoomResponse.dataRoomValidationError.computeNodeId ) ) else: if self.is_integrated_with_platform: data_room_hash = response.createDataRoomResponse.dataRoomId self._create_data_room_in_platform(data_room_definition, data_room_hash) else: raise Exception( "Expected createDataRoomResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.createDataRoomResponse def publish_data_room( self, data_room_definition: Tuple[DataRoom, List[ConfigurationModification]] ) -> str: """ Create a data room with the provided protobuf configuration object and have the enclave apply the given list of modifications to the data room configuration. The id returned from this method will be used when interacting with the published data room (for example when running computations or publishing datasets). """ response = self._publish_data_room(data_room_definition) return _get_data_room_id(response).hex() def publish_data_room_configuration_commit( self, configuration_commit: ConfigurationCommit ) -> str: """ Publish the given data room configuration commit. Configuration commits can be built using a `DataRoomCommitBuilder` object. The id returned from this method will be used when running development computations or when trying to merge this commit into the main data room configuration. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": configuration_commit.dataRoomId.hex(), } ) request = CreateConfigurationCommitRequest( commit=configuration_commit, scope=bytes.fromhex(scope_id) ) responses = self.send_request( GcgRequest(createConfigurationCommitRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("createConfigurationCommitResponse"): raise Exception( "Expected createConfigurationCommitResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.createConfigurationCommitResponse.commitId.hex() def retrieve_configuration_commit( self, configuration_commit_id: str, ) -> ConfigurationCommit: """ Retrieve the content of given configuration commit id. **Returns**: A `ConfigurationCommit`. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = RetrieveConfigurationCommitRequest( commitId=bytes.fromhex(configuration_commit_id), scope=bytes.fromhex(scope_id), ) responses = self.send_request( GcgRequest(retrieveConfigurationCommitRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveConfigurationCommitResponse"): raise Exception( "Expected retrieveConfigurationCommitResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveConfigurationCommitResponse.commit def retrieve_configuration_commit_approvers( self, configuration_commit_id: str, ) -> List[str]: """ Retrieve the list of users who need to approve the merger of a given configuration commit. **Returns**: A list of ids belonging to the users that need to approve the configuration commit. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = RetrieveConfigurationCommitApproversRequest( commitId=bytes.fromhex(configuration_commit_id), scope=bytes.fromhex(scope_id), ) responses = self.send_request( GcgRequest(retrieveConfigurationCommitApproversRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveConfigurationCommitApproversResponse"): raise Exception( "Expected retrieveConfigurationCommitApproversResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveConfigurationCommitApproversResponse.approvers def generate_merge_approval_signature( self, configuration_commit_id: str ) -> bytes: """ Generate an approval signature required for merging a configuration commit. To merge a specific configuration commit, each user referenced in the list of ids returned by `retrieveConfigurationCommitApprovers` needs to generate an approval signature using this method. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = GenerateMergeApprovalSignatureRequest( commitId=bytes.fromhex(configuration_commit_id), scope=bytes.fromhex(scope_id), ) responses = self.send_request( GcgRequest(generateMergeApprovalSignatureRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("generateMergeApprovalSignatureResponse"): raise Exception( "Expected generateMergeApprovalSignatureResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.generateMergeApprovalSignatureResponse.signature def merge_configuration_commit( self, configuration_commit_id: str, approval_signatures: Dict[str, bytes], ) -> MergeConfigurationCommitResponse: """ Request the enclave to merge the given configuration commit into the main data room configuration. **Parameters**: - `configuration_commit_id`: The id of the commit to be merged. - `approval_signatures`: A dictionary containing the approval signature for each of the required approvers, e.g. `{ "some@email.com": signature }`. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = MergeConfigurationCommitRequest( scope=bytes.fromhex(scope_id), commitId=bytes.fromhex(configuration_commit_id), approvalSignatures=approval_signatures, ) responses = self.send_request( GcgRequest(mergeConfigurationCommitRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("mergeConfigurationCommitResponse"): raise Exception( "Expected mergeConfigurationCommitResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.mergeConfigurationCommitResponse def retrieve_data_room_configuration_history( self, data_room_id: str ) -> RetrieveDataRoomConfigurationHistoryResponse: """ Retrieve the current merged data room configuration, as well as the history of configuration commits that have already been merged. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) request = RetrieveDataRoomConfigurationHistoryRequest( dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveDataRoomConfigurationHistoryRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveDataRoomConfigurationHistoryResponse"): raise Exception( "Expected retrieveDataRoomConfigurationHistoryResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveDataRoomConfigurationHistoryResponse def retrieve_current_data_room_configuration( self, data_room_id: str ) -> Tuple[DataRoomConfiguration, str]: """ Retrieve the current data room confguration, as well as the current "history pin". A history pin is the hash of all the ids of configuration commits that make up the structure of a data room. This pin therefore uniquely identifies a data room's structure at a certain point in time. A data room configuration, as well as its associated history pin, can be used to extend an existing data room (for example by adding new compute nodes). Extending an existing data room is done using the `DataRoomCommitBuilder` class. """ response = self.retrieve_data_room_configuration_history(data_room_id) return (response.currentConfiguration, response.pin.hex()) def stop_data_room(self, data_room_id: str): """ Stop the data room with the given id, making it impossible to run new computations. """ self._update_data_room_status(data_room_id, DataRoomStatus.Value("Stopped")) def _update_data_room_status( self, data_room_id: str, status # type: DataRoomStatus.V ) -> UpdateDataRoomStatusResponse: """ Update the status of the data room. For the special case of stopping a data room, the method `stop_data_room` can be used. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = UpdateDataRoomStatusRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), status=status, ) responses = self.send_request( GcgRequest(updateDataRoomStatusRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if response.HasField("updateDataRoomStatusResponse"): if self.is_integrated_with_platform: self.platform._platform_api.update_data_room_status(data_room_id, status) else: raise Exception( "Expected updateDataRoomStatusResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.updateDataRoomStatusResponse def retrieve_data_room_status( self, data_room_id: str ) -> str: """ Returns the status of the data room. Valid values are `"Active"` or `"Stopped"`. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RetrieveDataRoomStatusRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveDataRoomStatusRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveDataRoomStatusResponse"): raise Exception( "Expected retrieveDataRoomStatusResponse, got " + str(response.WhichOneof("gcg_response")) ) return DataRoomStatus.Name(response.retrieveDataRoomStatusResponse.status) def retrieve_data_room_definition( self, data_room_id: str ) -> RetrieveDataRoomResponse: """ Returns the underlying protobuf object for the data room. """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) request = RetrieveDataRoomRequest( dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveDataRoomRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveDataRoomResponse"): raise Exception( "Expected retrieveDataRoomResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveDataRoomResponse def retrieve_audit_log( self, data_room_id: str ) -> RetrieveAuditLogResponse: """ Returns the audit log for the data room. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RetrieveAuditLogRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrieveAuditLogRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrieveAuditLogResponse"): raise Exception( "Expected retrieveAuditLogResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrieveAuditLogResponse def publish_dataset( self, data_room_id: str, manifest_hash: str, leaf_id: str, key: Key, *, force: bool = False ) -> PublishDatasetToDataRoomResponse: """ Publishes a file and its encryption key to a data room. Neither the file or the encryption key will ever be stored in unencrypted form. This method will check whether the to-be-published file exists. If this is not the case, an exception will be raised. This behavior can be disabled by setting the `force` flag. In case the original client was created with platform integration enabled, the method will further check whether there already is a dataset published for the given data room. In this case, an exception will be thrown and the dataset will need to be unpublished first. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) dataset = self.client.get_dataset(manifest_hash) if not dataset and not force: raise Exception( "The dataset you are trying to publish does not exist" ) should_create_dataset_links =\ self.is_integrated_with_platform and \ self._is_web_data_room(data_room_id) if should_create_dataset_links: existing_link = self.platform._platform_api.get_dataset_link( data_room_id, self._convert_data_node_name_to_verifier_node(leaf_id) ) if existing_link: existing_dataset = existing_link["dataset"]["datasetId"] raise Exception( "The following dataset has already been published for this node." + " Please unpublish this dataset first." + f" Dataset: '{existing_dataset}'" ) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = PublishDatasetToDataRoomRequest( dataRoomId=bytes.fromhex(data_room_id), datasetHash=bytes.fromhex(manifest_hash), leafName=leaf_id, encryptionKey=key.material, scope=bytes.fromhex(scope_id) ) responses = self.send_request( GcgRequest(publishDatasetToDataRoomRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("publishDatasetToDataRoomResponse"): raise Exception( "Expected publishDatasetToDataRoomResponse, got " + str(response.WhichOneof("gcg_response")) ) else: if should_create_dataset_links: self.platform._platform_api.create_dataset_link( data_room_id, manifest_hash, self._convert_data_node_name_to_verifier_node(leaf_id) ) return response.publishDatasetToDataRoomResponse def _convert_data_node_name_to_verifier_node(self, node_name: str) -> str: pattern = re.compile(r"@table/(.*)/dataset") match = pattern.match(node_name) if match: return match.group(1) else: return node_name def remove_published_dataset( self, data_room_id: str, leaf_id: str, ) -> RemovePublishedDatasetResponse: """ Removes a published dataset from the data room. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RemovePublishedDatasetRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), leafName=leaf_id, ) responses = self.send_request( GcgRequest(removePublishedDatasetRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("removePublishedDatasetResponse"): raise Exception( "Expected removePublishedDatasetResponse, got " + str(response.WhichOneof("gcg_response")) ) else: if self.is_integrated_with_platform and self._is_web_data_room(data_room_id): try: self.platform._platform_api.delete_dataset_link( data_room_id, self._convert_data_node_name_to_verifier_node(leaf_id) ) except Exception as error: print(f"Error when deleting dataset link on platform: {error}") return response.removePublishedDatasetResponse def retrieve_published_datasets( self, data_room_id: str, ) -> RetrievePublishedDatasetsResponse: """ Returns the datasets published to the given data room. """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = RetrievePublishedDatasetsRequest( scope=bytes.fromhex(scope_id), dataRoomId=bytes.fromhex(data_room_id), ) responses = self.send_request( GcgRequest(retrievePublishedDatasetsRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("retrievePublishedDatasetsResponse"): raise Exception( "Expected retrievePublishedDatasetResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.retrievePublishedDatasetsResponse def _submit_dev_compute( self, configuration_commit_id: str, compute_node_ids: List[str], /, *, dry_run: bool = False, ) -> ExecuteComputeResponse: """ Submits a computation request which will generate an execution plan to perform the computation of the goal nodes """ endpoint_protocols = [1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_COMMITS_DATA, } ) request = ExecuteDevelopmentComputeRequest( configurationCommitId=bytes.fromhex(configuration_commit_id), computeNodeNames=compute_node_ids, isDryRun=dry_run, scope=bytes.fromhex(scope_id) ) responses = self.send_request( GcgRequest(executeDevelopmentComputeRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("executeComputeResponse"): raise Exception( "Expected executeComputeResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.executeComputeResponse def _submit_compute( self, data_room_id: str, compute_node_ids: List[str], /, *, dry_run: bool = False, ) -> ExecuteComputeResponse: """ Submits a computation request which will generate an execution plan to perform the computation of the goal nodes """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) scope_id = self.client._ensure_scope_with_metadata( self.email, { "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA, "data_room_id": data_room_id, } ) request = ExecuteComputeRequest( dataRoomId=bytes.fromhex(data_room_id), computeNodeNames=compute_node_ids, isDryRun=dry_run, scope=bytes.fromhex(scope_id) ) responses = self.send_request( GcgRequest(executeComputeRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("executeComputeResponse"): raise Exception( "Expected executeComputeResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.executeComputeResponse def get_computation_status(self, job_id: str) -> JobStatusResponse: """ Returns the status of the provided `job_id` which will include the names of the nodes that completed their execution """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) request = JobStatusRequest( jobId=bytes.fromhex(job_id), ) responses = self.send_request( GcgRequest(jobStatusRequest=request), protocol ) if len(responses) != 1: raise Exception("Malformed response") response = responses[0] if not response.HasField("jobStatusResponse"): raise Exception( "Expected jobStatusResponse, got " + str(response.WhichOneof("gcg_response")) ) return response.jobStatusResponse def _stream_job_results( self, job_id: bytes, compute_node_id: str, ) -> Iterator[GetResultsResponseChunk]: """ Streams the results of the provided `job_id` """ endpoint_protocols = [0, 1] protocol = self._get_client_protocol(endpoint_protocols) request = GetResultsRequest( jobId=job_id, computeNodeName=compute_node_id, ) responses = self.send_request( GcgRequest(getResultsRequest=request), protocol ) for response in responses: if response.HasField("getResultsResponseChunk"): yield response.getResultsResponseChunk elif response.HasField("getResultsResponseFooter"): return else: raise Exception( "Expected getResultsResponseChunk or getResultsResponseFooter, got " + str(response.WhichOneof("gcg_response")) ) raise Exception("Enclave connection aborted while streaming results") def _get_job_results( self, job_id: bytes, compute_node_id: str, ) -> bytes: """ Returns the results of the provided `job_id` """ return b"".join(list(map(lambda chunk: chunk.data, self._stream_job_results(job_id, compute_node_id)))) def run_computation( self, data_room_id: str, compute_node_id: str, /, *, dry_run: bool = False, ) -> JobId: """ Run a specific computation within the data room with the given id. The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results. """ response = self._submit_compute(data_room_id, [compute_node_id], dry_run=dry_run) return JobId(response.jobId.hex(), compute_node_id) def wait_until_computation_has_finished( self, job_id: JobId, /, *, interval: int = 5, timeout: int = None ): """ Wait for the given job to complete. The method will check for the job's completeness every `interval` seconds and up to an optional `timeout` seconds after which the method will raise an exception. """ elapsed = 0 while True: if timeout is not None and elapsed > timeout: raise Exception( f"Timeout when trying to get result for job {job_id.id} of" f" {job_id.compute_node_id} (waited {timeout} seconds)" ) elif job_id.compute_node_id in self.get_computation_status(job_id.id).completeComputeNodeNames: break else: sleep(interval) elapsed += interval def run_dev_computation( self, configuration_commit_id: str, compute_node_id: str, /, *, dry_run: bool = False, ) -> JobId: """ Run a specific computation within the context of the data room configuration defined by the given commit id. Such "development" computations can also be run for configuration commits that have not yet been merged. The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results. """ response = self._submit_dev_compute(configuration_commit_id, [compute_node_id], dry_run=dry_run) return JobId(response.jobId.hex(), compute_node_id) def get_computation_result( self, job_id: JobId, /, *, interval: int = 5, timeout: int = None ) -> bytes: """ Wait for the given job to complete and retrieve its results as a raw byte string. The method will check for the job's completeness every `interval` seconds and up to an optional `timeout` seconds after which the method will raise an exception. If the job completes and the results can be retrieved successfully, a raw byte string will be returned. The bytes tring can be transformed into a more useful object using a variety of helper methods. These helper methods are specific for the type of computation you ran and can be found in the corresponding packages. """ elapsed = 0 job_id_bytes = bytes.fromhex(job_id.id) self.wait_until_computation_has_finished( job_id, interval=interval, timeout=timeout ) results = self._get_job_results(job_id_bytes, job_id.compute_node_id) return results def run_computation_and_get_results( self, data_room_id: str, compute_node_id: str, /, *, interval: int = 5, timeout: int = None ) -> Optional[bytes]: """ Run a specific computation and return its results. This method is simply a wrapper for running `run_computation` and `get_computation_result` directly after each other """ job_id = self.run_computation(data_room_id, compute_node_id) return self.get_computation_result( job_id, interval=interval, timeout=timeout ) def _create_data_room_in_platform( self, data_room_definition: DataRoom, data_room_hash: bytes ): attestation_spec_type =\ self.driver_attestation_specification.WhichOneof("attestation_specification") if attestation_spec_type == "intelEpid": mrenclave = self.driver_attestation_specification.intelEpid.mrenclave elif attestation_spec_type == "intelDcap": mrenclave = self.driver_attestation_specification.intelDcap.mrenclave else: raise Exception("Unknown attestation specification type") attestation_specification_hash = hashlib.sha256( serialize_length_delimited( self.driver_attestation_specification ) ).hexdigest() self.platform._platform_api.publish_data_room( data_room_definition, data_room_hash=data_room_hash.hex(), attestation_specification_hash=attestation_specification_hash ) def _is_web_data_room(self, data_room_id: str) -> bool: data_room = self.platform._platform_api.get_data_room_by_hash(data_room_id) if data_room: return data_room["source"] == "WEB" else: raise Exception(f"Unable to find data room with id '{data_room_id}'") @property def platform(self) -> SessionPlatformFeatures: """ Provider of a list of convenience methods to interact with the Decentriq platform. This field exposes an object that provides a set of convenience features known from the Decentriq web platform. These include, for example, getting the list of data sets that are also visible in the browser-based platform UI. """ if self._platform: return self._platform else: raise Exception( "This field is not set as the client from wich this session has" " been derived has not been configured with integration to the web" " platform." ) @property def is_integrated_with_platform(self): return self._platform is not None
View Source
from typing import List, Any, Optional import uuid __all__ = [ "Node" ] class Node: name: str """The name of the node.""" config: bytes """Serialized configuration to use in the compute node definition.""" enclave_type: str """What type of enclave this node uses.""" dependencies: List[str] """A list of names of nodes on which this node directly depends.""" output_format: Any """The `proto.ComputeNodeFormat` of the output from this node.""" def __init__( self, name: str, config: bytes, enclave_type: str, dependencies: List[str], output_format: Any ): self.name = name self.config = config self.enclave_type = enclave_type self.dependencies = dependencies self.output_format = output_format