API
Datafile
- class octue.resources.datafile.Datafile(path, local_path=None, cloud_path=None, timestamp=None, mode='r', update_metadata=True, ignore_stored_metadata=False, id=None, tags=None, labels=None, **kwargs)
A representation of a data file with metadata.
Metadata consists of id, timestamp, tags, and labels, available as attributes on the instance. On instantiation, metadata for the file is obtained from its stored location (the corresponding cloud object metadata or a local .octue metadata file) if present. Metadata values can alternatively be passed as arguments at instantiation but will only be used if stored metadata cannot be found - i.e. stored metadata always takes precedence (use the ignore_stored_metadata parameter to override this behaviour). Stored metadata can be updated after instantiation using the update_metadata method.
- Parameters:
path (str|None) – The path of this file locally or in the cloud, which may include folders or subfolders, within the dataset
local_path (str|None) – If a cloud path is given as the path parameter, this is the path to an existing local file that is known to be in sync with the cloud object
cloud_path (str|None) – If a local path is given for the path parameter, this is a cloud path to keep in sync with the local file
timestamp (datetime.datetime|int|float|None) – A posix timestamp associated with the file, in seconds since epoch, typically when it was created but could relate to a relevant time point for the data
mode (str) – if using as a context manager, open the datafile for reading/editing in this mode (the mode options are the same as for the builtin open function)
update_metadata (bool) – if using as a context manager and this is True, update the stored metadata of the datafile when the context is exited
ignore_stored_metadata (bool) – if True, ignore any metadata stored for this datafile locally or in the cloud and use whatever is given at instantiation
id (str) – The Universally Unique ID of this file (checked to be valid if not None, generated if None)
tags (dict|octue.resources.tag.TagDict|None) – key-value pairs with string keys conforming to the Octue tag format (see TagDict)
labels (iter(str)|octue.resources.label.LabelSet|None) – Space-separated string of labels relevant to this file
- Return None:
- classmethod deserialise(serialised_datafile, from_string=False)
Deserialise a Datafile from a dictionary or JSON string.
- Parameters:
serialised_datafile (dict|str)
from_string (bool)
- Return Datafile:
- property name
Get the name of the datafile.
- Return str:
- property extension
Get the extension of the datafile.
- Return str:
- property cloud_path
Get the cloud path of the datafile.
- Return str|None:
- property cloud_hash_value
Get the hash value of the datafile according to its cloud file.
- Return str|None:
None if no cloud metadata is available
- property metadata_path
Get the path to the datafile’s local metadata file (if the datafile exists locally).
- Return str|None:
- property timestamp
Get the timestamp of the datafile.
- Return float:
- property posix_timestamp
Get the timestamp of the datafile in posix format.
- Return float:
- property size_bytes
Get the size of the datafile in bytes.
- Return float|None:
- property exists_locally
Return True if the file exists locally.
- Return bool:
- property local_path
Get the local path for the datafile, downloading it from the cloud to a temporary file if necessary.
- Return str:
The local path of the datafile.
- property open
Open the datafile for reading/writing. Usage is the same as the python built-in open context manager but it can only be used as a context manager e.g.
with datafile.open("w") as f: f.write("some data")
- upload(cloud_path=None, update_cloud_metadata=True)
Upload a datafile to Google Cloud Storage.
- Parameters:
cloud_path (str|None) – full path to cloud storage location to store datafile at (e.g. gs://bucket_name/path/to/file.csv)
update_cloud_metadata (bool) – if True, update the metadata of the datafile in the cloud at upload time
- Return str:
gs:// path for datafile
- download(local_path=None)
Download the file from the cloud to the given local path or a temporary path if none is given.
- Parameters:
local_path (str|None) – The local path to download the datafile to. A temporary path is used if none is given.
- Raises:
octue.exceptions.CloudLocationNotSpecified – If the datafile does not exist in the cloud
- Return str:
The path to the local file
- metadata(include_id=True, include_sdk_version=True, use_octue_namespace=True)
Get the datafile’s metadata in a serialised form (i.e. the attributes id, timestamp, labels, tags, and sdk_version).
- Parameters:
include_id (bool) – if True, include the ID of the datafile
include_sdk_version (bool) – if True, include the octue version that instantiated the datafile in the metadata
use_octue_namespace (bool) – if True, prefix metadata names with “octue__”
- Return dict:
- update_metadata()
Using the datafile instance’s in-memory metadata, update its cloud metadata (if the datafile is cloud-based) or its local metadata file (if the datafile is local).
- Return None:
- update_cloud_metadata()
Update the cloud metadata for the datafile.
- Return None:
- update_local_metadata()
Create or update the local octue metadata file with the datafile’s metadata.
- Return None:
- generate_signed_url(expiration=datetime.timedelta(days=7))
Generate a signed URL for the datafile.
- Parameters:
expiration (datetime.datetime|datetime.timedelta) – the amount of time or date after which the URL should expire
- Return str:
the signed URL for the datafile
- to_primitive()
Convert the datafile to a dictionary of primitives. A path key is added to the primitive to facilitate easier deserialisation as datafile instantiation requires a path parameter. The value mapped to this key is the local path if the datafile has one, or the cloud path if it doesn’t.
- Return dict:
- add_labels(*args)
Add one or more new labels to the object. New labels will be cleaned and validated.
- add_tags(tags=None, **kwargs)
Add one or more new tags to the object. New tags will be cleaned and validated.
- classmethod from_file(path, **kwargs)
Deserialise an instance from the given file.
- Parameters:
path (str) – the path to the JSON file containing the serialised instance
kwargs – kwargs to pass in to the JSON deserialisation
- Return any:
an instance of the class
- property hash_value
Get the hash of the instance.
- Return str:
- property id
Get the ID of the identifiable instance.
- Return str:
- property labels
Get the labels of the labelled object.
- Return iter:
- reset_hash()
Reset the hash value to the calculated hash (rather than whatever value has been set).
- Return None:
- serialise(**kwargs)
Serialise the instance to a JSON string of primitives. See the
Serialisable
constructor for more information.- Parameters:
kwargs – kwargs to pass in to the JSON serialisation
- Return str:
a JSON string containing the instance as a serialised python primitive
- property tags
Get the tags of the taggable instance.
- Return iter:
- to_file(path, **kwargs)
Write the instance to a JSON file.
- Parameters:
path (str) – path of file to write to, including relative or absolute path and .json extension
kwargs – kwargs to pass in to the JSON serialisation
- Return None:
Dataset
- class octue.resources.dataset.Dataset(path=None, files=None, recursive=True, ignore_stored_metadata=False, include_octue_metadata_files=False, id=None, name=None, tags=None, labels=None)
A representation of a dataset with metadata.
The default usage is to provide the path to a local or cloud directory and create the dataset from the files it contains. Alternatively, the files parameter can be provided and only those files are included. Either way, the path parameter should be explicitly set to something meaningful.
Metadata consists of id, name, tags, and labels, available as attributes on the instance. On instantiation, metadata for the dataset is obtained from its stored location (the corresponding cloud object metadata or a local .octue metadata file) if present. Metadata values can alternatively be passed as arguments at instantiation but will only be used if stored metadata cannot be found - i.e. stored metadata always takes precedence (use the ignore_stored_metadata parameter to override this behaviour). Stored metadata can be updated after instantiation using the update_metadata method.
- Parameters:
path (str|None) – the path to the dataset (defaults to the current working directory if none is given)
files (iter(str|dict|octue.resources.datafile.Datafile)|None) – the files belonging to the dataset
recursive (bool) – if True, include in the dataset all files in the subdirectories recursively contained within the dataset directory
ignore_stored_metadata (bool) – if True, ignore any metadata stored for this dataset locally or in the cloud and use whatever is given at instantiation
include_octue_metadata_files (bool) – if True, include .octue metadata files as datafiles in the dataset when instantiating it
id (str|None) – an optional UUID to assign to the dataset (defaults to a random UUID if none is given)
name (str|None) – an optional name to give to the dataset (defaults to the dataset directory name)
tags (dict|octue.resources.tag.TagDict|None) – key-value pairs with string keys conforming to the Octue tag format (see TagDict)
labels (iter(str)|octue.resources.label.LabelSet|None) – space-separated string of labels relevant to the dataset
- Return None:
- property name
Get the name of the dataset
- Return str:
- property exists_locally
Return True if the dataset exists locally.
- Return bool:
- property all_files_are_in_cloud
Do all the files of the dataset exist in the cloud?
- Return bool:
- property metadata_path
Get the path to the dataset’s metadata file.
- Return str:
- upload(cloud_path=None, update_cloud_metadata=True)
Upload a dataset to the given cloud path.
- Parameters:
cloud_path (str|None) – cloud path to store dataset at (e.g. gs://bucket_name/path/to/dataset)
update_cloud_metadata (bool) – if True, update the metadata of the dataset in the cloud at upload time
- Return str:
cloud path for dataset
- update_metadata()
Using the dataset instance’s in-memory metadata, update its cloud metadata (if the dataset is cloud-based) or its local metadata file (if the dataset is local).
- Return None:
- update_cloud_metadata()
Create or update the cloud metadata file for the dataset.
- Return None:
- update_local_metadata()
Create or update the local octue metadata file with the dataset’s metadata.
- Return None:
- generate_signed_url(expiration=datetime.timedelta(days=7))
Generate a signed URL for the dataset. This is done by uploading a uniquely named metadata file containing signed URLs to the datasets’ files and returning a signed URL to that metadata file.
- Parameters:
expiration (datetime.datetime|datetime.timedelta) – the amount of time or date after which the URL should expire
- Return str:
the signed URL for the dataset
- add(datafile, path_in_dataset=None)
Add a datafile to the dataset. If the datafile’s location is outside the dataset, it is copied to the dataset root or to the path_in_dataset if provided.
- Parameters:
datafile (octue.resources.datafile.Datafile) – the datafile to add to the dataset
path_in_dataset (str|None) – if provided, set the datafile’s local path to this path within the dataset
- Raises:
octue.exceptions.InvalidInputException – if the datafile is not a Datafile instance
- Return None:
- get_file_by_label(label)
Get a single datafile from a dataset by filtering for files with the provided label.
- Parameters:
label (str) – the label to filter for
- Raises:
octue.exceptions.UnexpectedNumberOfResultsException – if zero or more than one results satisfy the filters
- Return octue.resources.datafile.DataFile:
- download(local_directory=None)
Download all files in the dataset.
- Parameters:
local_directory (str|None) – the path to a local directory to download the dataset into; if not provided, the files will be downloaded to a temporary directory
- Return str:
the absolute path to the local directory
- to_primitive(include_files=True)
Convert the dataset to a dictionary of primitives, converting its files into their paths for a lightweight serialisation.
- Parameters:
include_files (bool) – if True, include the files parameter in the dictionary
- Return dict:
Manifest
- class octue.resources.manifest.Manifest(datasets=None, ignore_stored_metadata=False, id=None, name=None)
A representation of a manifest, which can contain multiple datasets This is used to manage all files coming into (or leaving), a data service for an analysis at the configuration, input or output stage.
- Parameters:
datasets (dict(str, octue.resources.dataset.Dataset|dict|str)|None) – a mapping of dataset names to Dataset instances, serialised datasets, or paths to datasets
ignore_stored_metadata (bool) – if True, ignore any metadata stored for the manifest’s datasets and datafiles locally or in the cloud
id (str|None) – the UUID of the manifest (a UUID is generated if one isn’t given)
name (str|None) – an optional name to give to the manifest
- Return None:
- classmethod from_cloud(cloud_path, ignore_stored_metadata=False)
Instantiate a manifest from a JSON serialisation of one in Google Cloud Storage.
- Parameters:
cloud_path (str) – full path to manifest in cloud storage (e.g. gs://bucket_name/path/to/manifest.json)
ignore_stored_metadata (bool) – if True, ignore any metadata stored for the manifest’s datasets and datafiles in the cloud
- Return octue.resources.manifest.Manifest:
- property all_datasets_are_in_cloud
Do all the files of all the datasets of the manifest exist in the cloud?
- Return bool:
- download(paths=None, download_all=True)
Download all datasets in the manifest. If no paths are provided, all datasets are downloaded to a temporary directory.
- Parameters:
paths (dict|None) – a mapping of dataset name to download directory path; if not provided, datasets are downloaded to temporary directories
download_all (bool) – if False and paths is provided, only download the datasets specified in paths
- Return dict(str, str):
the downloaded datasets mapped to the absolute paths of the directories they were downloaded into
- update_dataset_paths(path_generator)
Update the path of each dataset according to the given path generator function. This method is thread-safe.
- Parameters:
path_generator (callable) – a function taking a Dataset as its only argument and returning the new path of the dataset
- Return None:
- use_signed_urls_for_datasets()
Generate signed URLs for any cloud datasets in the manifest and use these as their paths instead of regular cloud paths. URLs will not be generated for any local datasets or datasets whose paths are already URLs (including those whose paths are already signed), making this method idempotent.
- Return None:
- to_cloud(cloud_path)
Upload a manifest to a cloud location, optionally uploading its datasets into the same directory.
- Parameters:
cloud_path (str) – full path to cloud storage location to store manifest at (e.g. gs://bucket_name/path/to/manifest.json)
- Return None:
- get_dataset(key)
Get a dataset by its key (as defined in the twine).
- Parameters:
key (str)
- Return octue.resources.dataset.Dataset:
- prepare(data)
Prepare new manifest from a manifest_spec.
- Parameters:
data (dict)
- Return Manifest:
- to_primitive()
Convert the manifest to a dictionary of primitives, converting its datasets into their paths for a lightweight serialisation.
- Return dict:
- classmethod deserialise(serialised_object, from_string=False)
Deserialise the given JSON-serialised object into an instance of the class.
- Parameters:
serialised_object (str|dict) – the string or dictionary of python primitives to deserialise into an instance
from_string (bool) – if
True
, deserialise from a JSON string; otherwise, deserialise from a dictionary
- Return any:
an instance of the class
- classmethod from_file(path, **kwargs)
Deserialise an instance from the given file.
- Parameters:
path (str) – the path to the JSON file containing the serialised instance
kwargs – kwargs to pass in to the JSON deserialisation
- Return any:
an instance of the class
- classmethod hash_non_class_object(object_)
Use the Hashable class to hash an arbitrary object that isn’t an attribute of a class instance.
- Parameters:
object (any)
- Return str:
- property hash_value
Get the hash of the instance.
- Return str:
- property id
Get the ID of the identifiable instance.
- Return str:
- metadata(include_id=True, include_sdk_version=True, **kwargs)
Get the instance’s metadata in primitive form. The metadata is the set of attributes included in the class variable self._METADATA_ATTRIBUTES.
- Parameters:
include_id (bool) – if True, include the ID of the instance if it is included in self._METADATA_ATTRIBUTES
include_sdk_version (bool) – if True, include the octue version that instantiated the instance
kwargs – any kwargs to use in an overridden self.metadata method
- Return dict:
- property metadata_hash_value
Get the hash of the instance’s metadata, not including its ID.
- Return str:
- abstract property metadata_path
Get the path to the instance’s local metadata file if it has one.
- Return str|None:
- property name
Get the name of the identifiable instance.
- Return str:
- reset_hash()
Reset the hash value to the calculated hash (rather than whatever value has been set).
- Return None:
- serialise(**kwargs)
Serialise the instance to a JSON string of primitives. See the
Serialisable
constructor for more information.- Parameters:
kwargs – kwargs to pass in to the JSON serialisation
- Return str:
a JSON string containing the instance as a serialised python primitive
- to_file(path, **kwargs)
Write the instance to a JSON file.
- Parameters:
path (str) – path of file to write to, including relative or absolute path and .json extension
kwargs – kwargs to pass in to the JSON serialisation
- Return None:
Analysis
- class octue.resources.analysis.Analysis(twine, handle_monitor_message=None, **kwargs)
A class representing a scientific or computational analysis. It holds references to all configuration, input, and output data, logs, connections to child services, credentials, etc. It’s essentially the “Internal API” for your service - a single point of contact where you can get or update anything you need.
An
Analysis
instance is automatically provided to the app in an Octue service when a question is received. Its attributes include every strand that can be added to aTwine
, although only the strands specified in the service’s twine will be non-None
. Incoming data is validated before it’s added to the analysis.All input and configuration attributes are hashed using a BLAKE3 hash so the inputs and configuration that produced a given output in your app can always be verified. These hashes exist on the following attributes:
input_values_hash
input_manifest_hash
configuration_values_hash
configuration_manifest_hash
If a strand is
None
, so will its corresponding hash attribute be. The hash of a datafile is the hash of its file, while the hash of a manifest or dataset is the cumulative hash of the files it refers to.- Parameters:
twine (twined.Twine|dict|str) – the twine, dictionary defining a twine, or path to “twine.json” file defining the service’s data interface
handle_monitor_message (callable|None) – an optional function for sending monitor messages to the parent that requested the analysis
configuration_values (any) – the configuration values for the analysis - this can be expressed as a python primitive (e.g. dict), a path to a JSON file, or a JSON string.
configuration_manifest (octue.resources.manifest.Manifest) – a manifest of configuration datasets for the analysis if required
input_values (any) – the input values for the analysis - this can be expressed as a python primitive (e.g. dict), a path to a JSON file, or a JSON string.
input_manifest (octue.resources.manifest.Manifest) – a manifest of input datasets for the analysis if required
output_values (any) – any output values the analysis produces
output_manifest (octue.resources.manifest.Manifest) – a manifest of output dataset from the analysis if it produces any
children (dict) – a mapping of string key to
Child
instance for all the children used by the serviceid (str) – Optional UUID for the analysis
- Return None:
- property finalised
Check whether the analysis has been finalised (i.e. whether its outputs have been validated and, if an output manifest is produced, its datasets uploaded).
- Return bool:
- send_monitor_message(data)
Send a monitor message to the parent that requested the analysis.
- Parameters:
data (any) – any JSON-compatible data structure
- Return None:
- set_up_periodic_monitor_message(create_monitor_message, period=60)
Set up a periodic monitor message that sends up-to-date data once per period.
- Parameters:
create_monitor_message (callable) – a callable that takes no arguments and returns a new up-to-date monitor message to send each time it’s called
period (int|float) – the repetition period in seconds
- Return None:
- finalise(upload_output_datasets_to=None, use_signed_urls=None)
Validate the output values and output manifest and, if the analysis produced an output manifest, upload its output datasets to a unique subdirectory within the analysis’s output location. This output location can be overridden by providing a different cloud path via the upload_output_datasets_to parameter.
- Parameters:
upload_output_datasets_to (str|None) – If not provided but an output location was provided at instantiation, upload any output datasets into a unique subdirectory within this output location; if provided, upload into this location instead. The output manifest is updated with the upload locations.
use_signed_urls (bool|None) – if True, use signed URLs instead of cloud URIs for dataset paths in the output manifest; if None, use the value of use_signed_urls_for_output_datasets given at instantiation
- Return None:
- add_labels(*args)
Add one or more new labels to the object. New labels will be cleaned and validated.
- add_tags(tags=None, **kwargs)
Add one or more new tags to the object. New tags will be cleaned and validated.
- classmethod deserialise(serialised_object, from_string=False)
Deserialise the given JSON-serialised object into an instance of the class.
- Parameters:
serialised_object (str|dict) – the string or dictionary of python primitives to deserialise into an instance
from_string (bool) – if
True
, deserialise from a JSON string; otherwise, deserialise from a dictionary
- Return any:
an instance of the class
- classmethod from_file(path, **kwargs)
Deserialise an instance from the given file.
- Parameters:
path (str) – the path to the JSON file containing the serialised instance
kwargs – kwargs to pass in to the JSON deserialisation
- Return any:
an instance of the class
- property id
Get the ID of the identifiable instance.
- Return str:
- property labels
Get the labels of the labelled object.
- Return iter:
- property name
Get the name of the identifiable instance.
- Return str:
- serialise(**kwargs)
Serialise the instance to a JSON string of primitives. See the
Serialisable
constructor for more information.- Parameters:
kwargs – kwargs to pass in to the JSON serialisation
- Return str:
a JSON string containing the instance as a serialised python primitive
- property tags
Get the tags of the taggable instance.
- Return iter:
- to_file(path, **kwargs)
Write the instance to a JSON file.
- Parameters:
path (str) – path of file to write to, including relative or absolute path and .json extension
kwargs – kwargs to pass in to the JSON serialisation
- Return None:
- to_primitive()
Convert the instance into a JSON-compatible python dictionary of its attributes as primitives. See the
Serialisable
constructor for more information.- Return dict:
Child
- class octue.resources.child.Child(id, backend, internal_sruid='local/local:local', service_registries=None)
A class representing an Octue child service that can be asked questions. This is a convenience wrapper for Service that makes asking questions more intuitive and allows easier selection of backends.
- Parameters:
id (str) – the ID of the child
backend (dict) – must include the key “name” with a value of the name of the type of backend e.g. “GCPPubSubBackend” and key-value pairs for any other parameters the chosen backend expects
internal_sruid (str) – the SRUID to give to the internal service used to ask questions to the child
service_registries (iter(dict)|None) – the names and endpoints of the registries used to resolve the child’s service revision when asking it questions; these should be in priority order (highest priority first)
- Return None:
- property received_events
Get the events received from the child if it has been asked a question. If it hasn’t, None is returned. If an empty list is returned, no messages have been received.
- Return list(dict)|None:
- ask(input_values=None, input_manifest=None, children=None, subscribe_to_logs=True, allow_local_files=False, handle_monitor_message=None, record_events=True, save_diagnostics='SAVE_DIAGNOSTICS_ON_CRASH', question_uuid=None, parent_question_uuid=None, originator_question_uuid=None, originator=None, push_endpoint=None, asynchronous=False, retry_count=0, raise_errors=True, max_retries=0, prevent_retries_when=None, log_errors=True, timeout=86400, maximum_heartbeat_interval=300)
Ask the child either: - A synchronous (ask-and-wait) question and wait for it to return an output. Questions are synchronous if
the push_endpoint isn’t provided and asynchronous=False.
An asynchronous (fire-and-forget) question and return immediately. To make a question asynchronous, provide the push_endpoint argument or set asynchronous=True.
- Parameters:
input_values (any|None) – any input values for the question, conforming with the schema in the child’s twine
input_manifest (octue.resources.manifest.Manifest|None) – an input manifest of any datasets needed for the question, conforming with the schema in the child’s twine
children (list(dict)|None) – a list of children for the child to use instead of its default children (if it uses children). These should be in the same format as in an app’s app configuration file and have the same keys.
subscribe_to_logs (bool) – if True, subscribe to logs from the child and handle them with the local log handlers
allow_local_files (bool) – if True, allow the input manifest to contain references to local files - this should only be set to True if the child will have access to these local files
handle_monitor_message (callable|None) – a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive as an argument (note that this could be an array or object)
record_events (bool) – if True, record messages received from the child in the received_events property
save_diagnostics (str) – must be one of {“SAVE_DIAGNOSTICS_OFF”, “SAVE_DIAGNOSTICS_ON_CRASH”, “SAVE_DIAGNOSTICS_ON”}; if turned on, allow the input values and manifest (and its datasets) to be saved by the child either all the time or just if it fails while processing them
question_uuid (str|None) – the UUID to use for the question if a specific one is needed; a UUID is generated if not
parent_question_uuid (str|None) – the UUID of the question that triggered this question
originator_question_uuid (str|None) – the UUID of the question that triggered all ancestor questions of this question; if None, this question is assumed to be the originator question
originator (str|None) – the SRUID of the service revision that triggered all ancestor questions of this question; if None, this service revision is assumed to be the originator
push_endpoint (str|None) – if answers to the question should be pushed to an endpoint, provide its URL here (the returned subscription will be a push subscription); if not, leave this as None
asynchronous (bool) – if True, don’t wait for an answer or create an answer subscription (the result and other events can be retrieved from the event store later)
retry_count (int) – the retry count of the question (this is zero if it’s the first attempt at the question)
raise_errors (bool) – if True and the question fails, raise the error; if False, return the error in place of the answer
max_retries (int) – if raise_errors=False and the question fails, retry the question up to this number of times
prevent_retries_when (list(type)|None) – if raise_errors=False and the question fails, prevent retrying the question if it fails with an exception type in this list
log_errors (bool) – if True, raise_errors=False, and the question fails after its final retry, log the error
timeout (float) – time in seconds to wait for an answer before raising a timeout error
maximum_heartbeat_interval (float|int) – the maximum amount of time (in seconds) allowed between child heartbeats before an error is raised
- Raises:
TimeoutError – if the timeout is exceeded while waiting for an answer
Exception – if the question raises an error and raise_errors=True
- Return dict|octue.cloud.pub_sub.subscription.Subscription|Exception|None, str:
for a synchronous question, a dictionary containing the keys “output_values” and “output_manifest” from the result (or just an exception if the question fails), and the question UUID; for a question with a push endpoint, the push subscription and the question UUID; for an asynchronous question, None and the question UUID
- ask_multiple(*questions, raise_errors=True, max_retries=0, prevent_retries_when=None, max_workers=None, log_errors=True)
Ask the child multiple questions in parallel and wait for the answers. Each question should be provided as a dictionary of Child.ask keyword arguments. The raise_errors, max_retries, prevent_retries_when, and log_errors arguments have the same effect as in Child.ask, applied to all questions. These values may be overridden on a per-question basis by specifying them in the question dictionary.
- Parameters:
questions – any number of questions provided as dictionaries of arguments to the Child.ask method
raise_errors (bool) – if True, an error is raised and no answers are returned if any of the individual questions raise an error; if False, answers are returned for all successful questions while errors are returned unraised for any failed ones
max_retries (int) – retry any questions that failed up to this number of times (note: this will have no effect unless raise_errors=False)
prevent_retries_when (list(type)|None) – prevent retrying any questions that fail with an exception type in this list (note: this will have no effect unless raise_errors=False)
max_workers (int|None) – the maximum number of questions that can be asked at once; defaults to the lowest of {32, no. of CPUs + 4, and no. of questions} (see concurrent.futures.ThreadPoolExecutor)
log_errors (bool) – if True and raise_errors=False, log any errors remaining once retries are exhausted
- Return list((dict|octue.cloud.pub_sub.subscription.Subscription|Exception|None, str)):
the answers to the questions and the question UUIDs (in the same order as asked)
Child emulator
- class octue.cloud.emulators.child.ChildEmulator(events=None, **kwargs)
An emulator for the octue.resources.child.Child class that handles the given events without contacting the real child or using Pub/Sub. Any events a real child could produce are supported. Child instances can be replaced/mocked like-for-like by ChildEmulator without the parent knowing.
- Parameters:
events (list(dict(dict))|None) – the list of events to send to the parent; each event must have an “event” key and an “attributes” key, and all events must conform to the service communication schema
kwargs – any number of keyword arguments that would normally be passed to Child.__init__
- Return None:
- property received_events
Get the events received from the child.
- Return list(dict):
- ask(handle_monitor_message=None, record_events=True, asynchronous=False, **kwargs)
- Ask the child emulator a question and receive its emulated response events. Unlike a real child, the input
values and manifest are not validated against the schema in the child’s twine as it is only available to the real child. Hence, the input values and manifest do not affect the events returned by the emulator.
- Parameters:
handle_monitor_message (callable|None) – a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive as an argument (note that this could be an array or object)
record_events (bool) – if True, record events received from the child in the received_events property
asynchronous (bool) – if True, don’t wait for an answer or create an answer subscription (the result and other events can be retrieved from the event store later)
kwargs – any number of keyword arguments that would normally be passed to Child.ask
- Return (dict, str)|(None, str):
a dictionary containing the keys “output_values” and “output_manifest” (or None if the question is asynchronous), and the question UUID
Filter containers
FilterSet
- class octue.resources.filter_containers.FilterSet
- filter(ignore_items_without_attribute=True, **kwargs)
Return a new instance containing only the Filterable`s to which the given filter criteria are `True.
- Parameters:
ignore_items_without_attribute (bool) – if True, just ignore any members of the container without a filtered-for attribute rather than raising an error
{str – any} kwargs: keyword arguments whose keys are the name of the filter and whose values are the values to filter for
- Return octue.resources.filter_containers.FilterContainer:
- one(**kwargs)
If a single result exists for the given filters, return it. Otherwise, raise an error.
- Parameters:
{str – any} kwargs: keyword arguments whose keys are the name of the filter and whose values are the values to filter for
- Raises:
octue.exceptions.UnexpectedNumberOfResultsException – if zero or more than one results satisfy the filters
- Return octue.resources.mixins.filterable.Filterable:
- order_by(attribute_name, check_start_value=None, check_constant_increment=None, reverse=False)
Order the Filterable`s in the container by an attribute with the given name, returning them as a new `FilterList regardless of the type of filter container begun with (`FilterSet`s and `FilterDict`s are inherently orderless).
- Parameters:
attribute_name (str) – name of attribute (optionally nested) to order by e.g. “a”, “a.b”, “a.b.c”
check_start_value (any) – if provided, check that the first item in the ordered container has the given start value for the attribute ordered by
check_constant_increment (int|float|None) – if given, check that the ordered-by attribute of each of the items in the ordered container increases by the given value when progressing along the sequence
reverse (bool) – if True, reverse the ordering
- Raises:
octue.exceptions.InvalidInputException – if an attribute with the given name doesn’t exist on any of the container’s members
- Return FilterList:
FilterList
- class octue.resources.filter_containers.FilterList(iterable=(), /)
- filter(ignore_items_without_attribute=True, **kwargs)
Return a new instance containing only the Filterable`s to which the given filter criteria are `True.
- Parameters:
ignore_items_without_attribute (bool) – if True, just ignore any members of the container without a filtered-for attribute rather than raising an error
{str – any} kwargs: keyword arguments whose keys are the name of the filter and whose values are the values to filter for
- Return octue.resources.filter_containers.FilterContainer:
- one(**kwargs)
If a single result exists for the given filters, return it. Otherwise, raise an error.
- Parameters:
{str – any} kwargs: keyword arguments whose keys are the name of the filter and whose values are the values to filter for
- Raises:
octue.exceptions.UnexpectedNumberOfResultsException – if zero or more than one results satisfy the filters
- Return octue.resources.mixins.filterable.Filterable:
- order_by(attribute_name, check_start_value=None, check_constant_increment=None, reverse=False)
Order the Filterable`s in the container by an attribute with the given name, returning them as a new `FilterList regardless of the type of filter container begun with (`FilterSet`s and `FilterDict`s are inherently orderless).
- Parameters:
attribute_name (str) – name of attribute (optionally nested) to order by e.g. “a”, “a.b”, “a.b.c”
check_start_value (any) – if provided, check that the first item in the ordered container has the given start value for the attribute ordered by
check_constant_increment (int|float|None) – if given, check that the ordered-by attribute of each of the items in the ordered container increases by the given value when progressing along the sequence
reverse (bool) – if True, reverse the ordering
- Raises:
octue.exceptions.InvalidInputException – if an attribute with the given name doesn’t exist on any of the container’s members
- Return FilterList:
FilterDict
- class octue.resources.filter_containers.FilterDict(dict=None, /, **kwargs)
A dictionary that is filterable by its values’ attributes. Each key can be anything, but each value must be an
octue.mixins.filterable.Filterable
instance.- filter(ignore_items_without_attribute=True, **kwargs)
Return a new instance containing only the Filterables for which the given filter criteria apply are satisfied.
- Parameters:
ignore_items_without_attribute (bool) – if True, just ignore any members of the container without a filtered-for attribute rather than raising an error
{str – any} kwargs: keyword arguments whose keys are the name of the filter and whose values are the values to filter for
- Return FilterDict:
- order_by(attribute_name, reverse=False)
Order the instance by the given attribute_name, returning the instance’s elements as a new FilterList.
- Parameters:
attribute_name (str) – name of attribute (optionally nested) to order by e.g. “a”, “a.b”, “a.b.c”
reverse (bool) – if True, reverse the ordering
- Raises:
octue.exceptions.InvalidInputException – if an attribute with the given name doesn’t exist on any of the FilterDict’s values
- Return FilterList:
- one(**kwargs)
If a single item exists for the given filters, return it. Otherwise, raise an error.
- Parameters:
{str – any} kwargs: keyword arguments whose keys are the name of the filter and whose values are the values to filter for
- Raises:
octue.exceptions.UnexpectedNumberOfResultsException – if zero or more than one results satisfy the filters
- Return (any, octue.resources.mixins.filterable.Filterable):
Configuration
- octue.configuration.load_service_and_app_configuration(service_configuration_path=None)
Load the service configuration from the given YAML file and the app configuration referenced in it. If no app configuration is referenced, an empty one is returned.
- Parameters:
service_configuration_path (str|None) – the path to the service configuration YAML file; if not provided, the OCTUE_SERVICE_CONFIGURATION_PATH environment variable is used if present, otherwise the local path octue.yaml is used
- Return (octue.configuration.ServiceConfiguration, octue.configuration.AppConfiguration):
the service configuration loaded from the YAML file and the app configuration specified by the service configuration (or an empty app configuration if none is specified)
Service configuration
- class octue.configuration.ServiceConfiguration(name, namespace, app_source_path='.', twine_path='twine.json', app_configuration_path=None, diagnostics_cloud_path=None, service_registries=None, event_store_table_id=None, delete_local_files=False, directory=None, **kwargs)
A class containing the details needed to configure a service.
- Parameters:
name (str) – the name to give the service
namespace (str) – the namespace for grouping the service with others (e.g. the name of an organisation or individual)
app_source_path (str) – the path to the directory containing the app’s source code
twine_path (str) – the path to the twine file defining the schema for input, output, and configuration data for the service
app_configuration_path (str|None) – the path to the app configuration file containing configuration data for the service; if this is None, the default application configuration is used
diagnostics_cloud_path (str|None) – the path to a cloud directory to store diagnostics (this includes the configuration, input values and manifest, and logs for each question)
service_registries (iter(dict)|None) – the names and endpoints of the registries used to resolve service revisions when asking questions; these should be in priority order (highest priority first)
event_store_table_id (str|None) – the full ID of the Google BigQuery table used as the event store e.g. “your-project.your-dataset.your-table”
delete_local_files (bool) – if True, delete any files downloaded and temporary directories created during an analysis once it’s finished
directory (str|None) – if provided, find the app source, twine, and app configuration relative to this directory
- Return None:
- classmethod from_file(path=None)
Load a service configuration from a YAML file.
- Parameters:
path (str|None) – the path to the service configuration YAML file; if not provided, the OCTUE_SERVICE_CONFIGURATION_PATH environment variable is used if present, otherwise the local path octue.yaml is used
- Return ServiceConfiguration:
the service configuration loaded from the file
App configuration
- class octue.configuration.AppConfiguration(configuration_values=None, configuration_manifest=None, children=None, output_location=None, use_signed_urls_for_output_datasets=False, **kwargs)
A class containing the configuration data needed to start an app as a service. The configuration data should conform to the service’s twine schema.
- Parameters:
configuration_values (str|dict|list|None) – values to configure the app
configuration_manifest (str|dict|octue.resources.Manifest|None) – a manifest of datasets to configure the app
children (str|list(dict)|None) – details of the children the app requires
output_location (str|None) – the path to a cloud directory to save output datasets at
use_signed_urls_for_output_datasets (bool) – if True, use signed URLs instead of cloud URIs for dataset paths in the output manifest
- Return None:
- classmethod from_file(path)
Load an app configuration from a file.
- Parameters:
path (str)
- Return AppConfiguration:
Runner
- class octue.runner.Runner(app_src, twine='twine.json', configuration_values=None, configuration_manifest=None, children=None, output_location=None, use_signed_urls_for_output_datasets=False, diagnostics_cloud_path=None, project_name=None, service_id=None, service_registries=None, delete_local_files=False)
A runner of analyses for a given service.
The
Runner
class provides a set of configuration parameters for use by your application, together with a range of methods for managing input and output file parsing as well as controlling logging.- Parameters:
app_src (callable|type|module|str) – either a function that accepts an Octue analysis, a class with a
run
method that accepts an Octue analysis, or a path to a directory containing anapp.py
file containing one of thesetwine (str|dict|twined.Twine) – path to the twine file, a string containing valid twine json, or a Twine instance
configuration_values (str|dict|None) – The strand data. Can be expressed as a string path of a *.json file (relative or absolute), as an open file-like object (containing json data), as a string of json data or as an already-parsed dict.
configuration_manifest (str|dict|None) – The strand data. Can be expressed as a string path of a *.json file (relative or absolute), as an open file-like object (containing json data), as a string of json data or as an already-parsed dict.
children (str|list(dict)|None) – The children strand data. Can be expressed as a string path of a *.json file (relative or absolute), as an open file-like object (containing json data), as a string of json data or as an already-parsed dict.
output_location (str|None) – the path to a cloud directory to save output datasets at
use_signed_urls_for_output_datasets (bool) – if True, use signed URLs instead of cloud URIs for dataset paths in the output manifest
diagnostics_cloud_path (str|None) – the path to a cloud directory to store diagnostics (this includes the configuration, input values and manifest, and logs for each question)
project_name (str|None) – name of Google Cloud project to get credentials from
service_id (str|None) – the ID of the service being run
delete_local_files (bool) – if True, delete any files downloaded and registered temporary directories created during an analysis once it’s finished
- Return None:
- classmethod from_configuration(service_configuration, app_configuration, project_name=None, service_id=None, **overrides)
Instantiate a runner from a service and app configuration.
- Parameters:
service_configuration (octue.configuration.ServiceConfiguration)
app_configuration (octue.configuration.AppConfiguration)
project_name (str|None) – name of Google Cloud project to get credentials from
service_id (str|None) – the ID of the service being run
overrides – optional keyword arguments to override the Runner instantiation parameters extracted from the service and app configuration
- Return octue.runner.Runner:
a runner configured with the given service and app configuration
- run(analysis_id=None, input_values=None, input_manifest=None, children=None, analysis_log_level=20, analysis_log_handler=None, handle_monitor_message=None, save_diagnostics='SAVE_DIAGNOSTICS_ON_CRASH', originator_question_uuid=None, originator=None)
Run an analysis.
- Parameters:
analysis_id (str|None) – UUID of analysis
input_values (str|dict|None) – the input_values strand data. Can be expressed as a string path of a *.json file (relative or absolute), as an open file-like object (containing json data), as a string of json data or as an already-parsed dict.
input_manifest (str|dict|octue.resources.manifest.Manifest|None) – The input_manifest strand data. Can be expressed as a string path of a *.json file (relative or absolute), as an open file-like object (containing json data), as a string of json data or as an already-parsed dict.
children (list(dict)|None) – a list of children to use instead of the children provided at instantiation. These should be in the same format as in an app’s app configuration file and have the same keys.
analysis_log_level (str) – the level below which to ignore log messages
analysis_log_handler (logging.Handler|None) – the logging.Handler instance which will be used to handle logs for this analysis run. Handlers can be created as per the logging cookbook https://docs.python.org/3/howto/logging-cookbook.html but should use the format defined above in LOG_FORMAT.
handle_monitor_message (callable|None) – a function that sends monitor messages to the parent that requested the analysis
save_diagnostics (str) – must be one of {“SAVE_DIAGNOSTICS_OFF”, “SAVE_DIAGNOSTICS_ON_CRASH”, “SAVE_DIAGNOSTICS_ON”}; if turned on, allow the input values and manifest (and its datasets) to be saved either all the time or just if the analysis fails
originator_question_uuid (str|None) – the UUID of the question that triggered all ancestor questions of this analysis; if None, this question is assumed to be the originator question
originator (str|None) – the SRUID of the service revision that triggered all ancestor questions of this question; if None, this service revision is assumed to be the originator
- Return octue.resources.analysis.Analysis:
Octue essential monitor messages
A module containing helper functions for sending monitor messages that conform to the Octue essential monitor message schema https://jsonschema.registry.octue.com/octue/essential-monitors/0.0.2.json
- octue.essentials.monitor_messages.send_status_text(analysis, text, service_name)
Send a status-type monitor message and additionally log it to the info level.
- Parameters:
analysis (octue.resources.analysis.Analysis) – the analysis from which to send the status text
text (str) – the text of the status message
service_name (str) – the name of the service/child running the analysis
- Return None:
- octue.essentials.monitor_messages.send_estimated_seconds_remaining(analysis, estimated_seconds_remaining, service_name)
Send an estimated-seconds-remaining monitor message.
- Parameters:
analysis (octue.resources.analysis.Analysis) – the analysis from which to send the estimate
estimated_seconds_remaining (float)
service_name (str) – the name of the service/child running the analysis
Octue log handler
- octue.log_handlers.apply_log_handler(logger_name=None, logger=None, handler=None, log_level=20, formatter=None, include_line_number=False, include_process_name=False, include_thread_name=False)
Apply a log handler with the given formatter to the logger with the given name. By default, the default Octue log handler is used on the root logger.
- Parameters:
logger_name (str|None) – the name of the logger to apply the handler to; if this and logger are None, the root logger is used
logger (logging.Logger|None) – the logger instance to apply the handler to (takes precedence over a logger name)
handler (logging.Handler|None) – The handler to use. If None, the default StreamHandler will be attached.
log_level (int|str) – ignore log messages below this level
formatter (logging.Formatter|None) – if provided, this formatter is used and the other formatting options are ignored
include_line_number (bool) – if True, include the line number in the log context
include_process_name (bool) – if True, include the process name in the log context
include_thread_name (bool) – if True, include the thread name in the log context
- Return logging.Handler: