Asking services questions
What is a question?
A question is a set of data (input values and/or an input manifest) sent to a child for processing/analysis. Questions can be:
Synchronous (“ask-and-wait”): A question whose answer is waited for in real time
Asynchronous (“fire-and-forget”): A question whose answer is not waited for and is instead retrieved later. There are two types:
Regular: Responses to these questions are automatically stored in an event store where they can be retrieved using the Octue SDK
Push endpoint: Responses to these questions are pushed to an HTTP endpoint for asynchronous handling using Octue’s django-twined or custom logic in your own webserver.
Questions are always asked to a revision of a service. You can ask a service a question if you have its SRUID, project name, and the necessary permissions.
Asking a question
from octue.resources import Child
child = Child(
id="my-organisation/my-service:2.1.7",
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
)
answer, question_uuid = child.ask(
input_values={"height": 32, "width": 3},
input_manifest=manifest,
)
answer["output_values"]
>>> {"some": "data"}
answer["output_manifest"]["my_dataset"].files
>>> <FilterSet({<Datafile('my_file.csv')>, <Datafile('another_file.csv')>})>
Note
Not including a service revision tag will cause your question to be sent to the default revision of the service (usually the latest deployed version). This is determined by making a request to a service registry if one or more registries are defined. If none of the service registries contain an entry for this service, a specific service revision tag must be used.
You can also set the following options when you call Child.ask
:
children
- If the child has children of its own (i.e. grandchildren of the parent), this optional argument can be used to override the child’s “default” children. This allows you to specify particular versions of grandchildren to use (see this subsection below).subscribe_to_logs
- if true, the child will forward its logs to youallow_local_files
- if true, local files/datasets are allowed in any input manifest you supplyhandle_monitor_message
- if provided a function, it will be called on any monitor messages from the childrecord_events
– ifTrue
, events received from the parent while it processes the question are saved to theChild.received_events
propertysave_diagnostics
– must be one of {“SAVE_DIAGNOSTICS_OFF”, “SAVE_DIAGNOSTICS_ON_CRASH”, “SAVE_DIAGNOSTICS_ON”}; if turned on, allow the input values and manifest (and its datasets) to be saved by the child either all the time or just if the analysis failsquestion_uuid
- if provided, the question will use this UUID instead of a generated onepush_endpoint
- if provided, the result and other events produced during the processing of the question will be pushed to this HTTP endpoint (a URL)asynchronous
- ifTrue
, don’t wait for an answer to the question (the result and other events can be retrieved from the event store later)timeout
- how long in seconds to wait for an answer (None
by default - i.e. don’t time out)
If the question fails:
- If raise_errors=False
, the unraised error is returned
- If raise_errors=False
and max_retries > 0
, the question is retried up to this number of times
- If raise_errors=False
, max_retries > 0
, and prevent_retries_when
is a list of exception types, the question is retried unless the error type is in the list
- If raise_errors=False
, log_errors=True
, and the question fails after its final retry, the error is logged
Exceptions raised by a child
If a child raises an exception while processing your question, the exception will always be forwarded and re-raised in your local service or python session. You can handle exceptions in whatever way you like.
Timeouts
If setting a timeout, bear in mind that the question has to reach the child, the child has to run its analysis on the
inputs sent to it (this will most likely make up the dominant part of the wait time), and the answer has to be sent back
to the parent. If you’re not sure how long a particular analysis might take, it’s best to set the timeout to None
initially or ask the owner/maintainer of the child for an estimate.
Retrieving answers to asynchronous questions
To retrieve results and other events from the processing of a question later, make sure you have the permissions to access the event store and run:
from octue.cloud.pub_sub.bigquery import get_events
events = get_events(
table_id="your-project.your-dataset.your-table",
question_uuid="53353901-0b47-44e7-9da3-a3ed59990a71",
)
Options
question_uuid
- Retrieve events from this specific questionparent_question_uuid
- Retrieve events from questions triggered by the same parent question (this doesn’t include the parent question’s events)originator_question_uuid
- Retrieve events for the entire tree of questions triggered by an originator question (a question asked manually throughChild.ask
; this does include the originator question’s events)kind
- Only retrieve this kind of event if present (e.g. “result”)include_backend_metadata
- IfTrue
, retrieve information about the service backend that produced the eventlimit
- If set to a positive integer, limit the number of events returned to this
Note
Only one of question_uuid
, parent_question_uuid
, and originator_question_uuid
can be provided at one time.
See an example output here...
>>> events
[
{
"event": {
"kind": "delivery_acknowledgement",
},
},
{
"event": {
"kind": "log_record",
"log_record": {
"args": null,
"created": 1709739861.5949728,
"exc_info": null,
"exc_text": null,
"filename": "app.py",
"funcName": "run",
"levelname": "INFO",
"levelno": 20,
"lineno": 28,
"module": "app",
"msecs": 594.9728488922119,
"msg": "Finished example analysis.",
"name": "app",
"pathname": "/workspace/example_service_cloud_run/app.py",
"process": 2,
"processName": "MainProcess",
"relativeCreated": 8560.13798713684,
"stack_info": null,
"thread": 68328473233152,
"threadName": "ThreadPoolExecutor-0_2"
}
},
},
{
"event": {
"kind": "heartbeat",
},
},
{
"event": {
"kind": "result",
"output_manifest": {
"datasets": {
"example_dataset": {
"files": [
"gs://octue-sdk-python-test-bucket/example_output_datasets/example_dataset/output.dat"
],
"id": "419bff6b-08c3-4c16-9eb1-5d1709168003",
"labels": [],
"name": "divergent-strange-gharial-of-pizza",
"path": "https://storage.googleapis.com/octue-sdk-python-test-bucket/example_output_datasets/example_dataset/.signed_metadata_files/divergent-strange-gharial-of-pizza",
"tags": {}
}
},
"id": "a13713ae-f207-41c6-9e29-0a848ced6039",
"name": null
},
"output_values": [1, 2, 3, 4, 5]
},
},
]
Asking multiple questions in parallel
You can also ask multiple questions to a service in parallel - just provide questions as dictionaries of Child.ask arguments:
child.ask_multiple(
{"input_values": {"height": 32, "width": 3}},
{"input_values": {"height": 12, "width": 10}},
{"input_values": {"height": 7, "width": 32}},
)
>>> [
({"output_values": {"some": "output"}, "output_manifest": None}, '2681ef4e-4ab7-4cf9-8783-aad982d5e324'),
({"output_values": {"another": "result"}, "output_manifest": None}, '474923bd-14b6-4f4c-9bfe-8148358f35cd'),
({"output_values": {"different": "result"}, "output_manifest": None}, '9a50daae-2328-4728-9ddd-b2252474f118'),
]
This method uses multithreading, allowing all the questions to be asked at once instead of one after another.
Hint
The maximum number of threads that can be used to ask questions in parallel can be set via the max_workers
argument. It has no effect on the total number of questions that can be asked, just how many can be in progress at
once.
Asking a question within a service
If you have created your own Octue service and want to ask children questions, you can do
this more easily than above. Children are accessible from the analysis
object by the keys you give them in the
app configuration file. For example, you can ask an elevation
service a question like
this:
answer, question_uuid = analysis.children["elevation"].ask(input_values={"longitude": 0, "latitude": 1})
if your app configuration file is:
{
"children": [
{
"key": "wind_speed",
"id": "template-child-services/wind-speed-service:2.1.1",
"backend": {
"name": "GCPPubSubBackend",
"project_name": "my-project"
}
},
{
"key": "elevation",
"id": "template-child-services/elevation-service:3.1.9",
"backend": {
"name": "GCPPubSubBackend",
"project_name": "my-project"
}
}
]
}
and your twine.json
file includes the child keys in its children
field:
{
"children": [
{
"key": "wind_speed",
"purpose": "A service that returns the average wind speed for a given latitude and longitude.",
},
{
"key": "elevation",
"purpose": "A service that returns the elevation for a given latitude and longitude.",
}
]
}
See the parent service’s app configuration and app.py file in the child-services app template to see this in action.
Overriding a child’s children
If the child you’re asking a question to has its own children (static children), you can override these by providing the
IDs of the children you want it to use (dynamic children) to the Child.ask
method. Questions that would have gone to the static children will instead go to the dynamic children. Note that:
You must provide the children in the same format as they’re provided in the app configuration
If you override one static child, you must override others, too
The dynamic children must have the same keys as the static children (so the child knows which service to ask which questions)
You should ensure the dynamic children you provide are compatible with and appropriate for questions from the child service
For example, if the child requires these children in its app configuration:
[
{
"key": "wind_speed",
"id": "template-child-services/wind-speed-service:2.1.1",
"backend": {
"name": "GCPPubSubBackend",
"project_name": "octue-sdk-python"
},
},
{
"key": "elevation",
"id": "template-child-services/elevation-service:3.1.9",
"backend": {
"name": "GCPPubSubBackend",
"project_name": "octue-sdk-python"
},
}
]
then you can override them like this:
answer, question_uuid = child.ask(
input_values={"height": 32, "width": 3},
children=[
{
"key": "wind_speed",
"id": "my/own-service:1.0.0",
"backend": {
"name": "GCPPubSubBackend",
"project_name": "octue-sdk-python"
},
},
{
"key": "elevation",
"id": "organisation/another-service:0.1.0",
"backend": {
"name": "GCPPubSubBackend",
"project_name": "octue-sdk-python"
},
},
],
)
Overriding beyond the first generation
It’s an intentional choice to only go one generation deep with overriding children. If you need to be able to specify a whole tree of children, grandchildren, and so on, please upvote this issue.
Using a service registry
When asking a question, you can optionally specify one or more service registries to resolve SRUIDs against. This is analogous to specifying a
different pip
index for resolving package names when using pip install
. If you don’t specify any registries, the
default Octue service registry is used.
Specifying service registries can be useful if:
You have your own private services that aren’t on the default Octue service registry
You want services from one service registry with the same name as in another service registry to be prioritised
Specifying service registries
You can specify service registries in two ways:
Globally for all questions asked inside a service. In the service configuration (
octue.yaml
file):services: - namespace: my-organisation name: my-app service_registries: - name: my-registry endpoint: blah.com/services
For questions to a specific child, inside or outside a service:
child = Child( id="my-organisation/my-service:1.1.0", backend={"name": "GCPPubSubBackend", "project_name": "my-project"}, service_registries=[ {"name": "my-registry", "endpoint": "blah.com/services"}, ] )