Integrate with Lattice Tasking
This page describes how to integrate with
Task Manager, the service that lets
you create, update, monitor, and execute a task.
For reference see TaskManager
(gRPC | HTTP).
In an integration with Lattice, operators and taskable agents interact with Task Manager to progress a task through its full lifecycle. Task Manager handles task state management and the correct routing of tasks to taskable agents.
If you're integrating an agent with Lattice's tasking infrastructure, your robot should only use the following endpoints:
ListenAsAgent
: To monitor for, and receive, tasks.UpdateStatus
: To report real-time status updates to Task Manager as the agent makes progress through a task.
Before you begin
- To publish taskable entities, and subscribe to tasks, set up your Lattice environment.
- Familiarize yourself with entities and different entity types.
Define tasks for an agent
An agent is an entity, typically an asset, or group of entities, that can complete
specific tasks. To let Lattice know your agent can complete specific tasks, add the
TaskDefinitions
in the entity's
TaskCatalog
component.
Lattice will not send a task to an agent for execution unless the task is
listed in the agent's TaskCatalog
.
The TaskCatalog
component specifies the types of tasks an agent can perform.
In the following example, the agent can execute:
-
A
VisualId
task, which might involve identifying or tracking visual targets. -
An
Investigate
task, which might involve reconnaissance or surveillance."taskCatalog": {
"taskDefinitions": [
{
"taskSpecificationUrl": "type.googleapis.com/anduril.tasks.v2.VisualId"
},
{
"taskSpecificationUrl": "type.googleapis.com/anduril.tasks.v2.Investigate"
}
]
}
Each task definition requires a taskSpecificationUrl
, which is a URL that
uniquely identifies the task's protobuf message type.
You will need to work with your Anduril representative to determine an appropriate task specification.
Publish asset
To configure a taskable asset, do the following:
- Import
TaskCatalog
andTaskDefinition
:
from anduril.tasks.v2 import TaskCatalog, TaskDefinition
- Add the
TaskCatalog
component to the entity when using thePublishEntity
API:
# Define the asset to publish.
entity = Entity(
entity_id = id,
created_time=creation_time,
aliases=Aliases(n ame="Simulated Drone"),
expiry_time=timenow + timedelta(minutes=10),
mil_view=MilView(disposition=Disposition.FRIENDLY),
location=Location(
position=Position(
# Move the entity in a circle.
latitude_degrees=33.69447852698943 + (radius_degrees * math.cos(t)),
longitude_degrees=-117.9173785693163 + (radius_degrees * math.sin(t)))),
ontology=Ontology(
template=Template.ASSET,
platform_type="UAV"
),
# Define a task catalog for the asset. Each task definition represents a task that
# the asset can perform.
task_catalog=TaskCatalog(
task_definitions=[
TaskDefinition(
task_specification_url="type.googleapis.com/anduril.tasks.v2.VisualId"
),
TaskDefinition(
task_specification_url="type.googleapis.com/anduril.tasks.v2.Investigate"
)
]
),
provenance=Provenance(
integration_name="your_integration",
data_type="your_data_type",
source_update_time=timenow),
is_live=True)
- Python
app.py
from anduril.entitymanager.v1 import *
from anduril.ontology.v1 import Disposition
from anduril.tasks.v2 import TaskCatalog, TaskDefinition
from grpclib.client import Channel
from uuid import uuid4
from datetime import datetime, timezone, timedelta
import asyncio
import math
import os
import sys
# Retrieve your environment token and Lattice URL from environment variables.
lattice_url = os.getenv('LATTICE_URL')
environment_token = os.getenv('ENVIRONMENT_TOKEN')
# Add the following statement to retrieve the sandboxes token
# if you are developing on Lattice Sandboxes.
#
# sandboxes_token = os.getenv('SANDBOXES_TOKEN')
if not environment_token or not lattice_url:
print("Make sure your Lattice URL and bearer token have been set as system environment variables.")
sys.exit(1)
# Add the following statement if you are developing on Lattice Sandboxes.
# if not sandboxes_token:
# print("Make sure your sandboxes token has been set as system environment variables.")
# sys.exit(1)
metadata = {
'authorization': f"Bearer {environment_token}",
# Add the following statement if you are developing on Lattice Sandboxes.
# 'anduril-sandbox-authorization': f"Bearer {sandboxes_token}"
}
async def publish_entity(entity):
channel = Channel(host=lattice_url, port=443, ssl=True)
stub = EntityManagerApiStub(channel)
request = PublishEntityRequest(entity=entity)
response = await stub.publish_entity(request, metadata=metadata)
channel.close()
return response
async def main():
try:
count = 0
radius_degrees = .1
creation_time = datetime.now(timezone.utc)
timenow = datetime.now(timezone.utc)
count += .1
t = math.radians(count)
id = str(uuid4())
print(f"Publishing simulated asset: {id}")
while (True):
timenow = datetime.now(timezone.utc)
count += .1
t = math.radians(count)
entity = Entity(
entity_id = id,
created_time=creation_time,
aliases=Aliases(name="Simulated Drone"),
expiry_time=timenow + timedelta(minutes=10),
mil_view=MilView(disposition=Disposition.FRIENDLY),
location=Location(
position=Position(
# Move the entity in a circle.
latitude_degrees=33.69447852698943 + (radius_degrees * math.cos(t)),
longitude_degrees=-117.9173785693163 + (radius_degrees * math.sin(t)))),
ontology=Ontology(
template=Template.ASSET,
platform_type="UAV"
),
# Define a task catalog for the asset. Each task definition represents a task that
# the asset can perform.
task_catalog=TaskCatalog(
task_definitions=[
TaskDefinition(
task_specification_url="type.googleapis.com/anduril.tasks.v2.VisualId"
),
TaskDefinition(
task_specification_url="type.googleapis.com/anduril.tasks.v2.Investigate"
)
]
),
provenance=Provenance(
integration_name="your_integration",
data_type="your_data_type",
source_update_time=timenow),
is_live=True)
await publish_entity(entity)
await asyncio.sleep(.01)
except Exception as error:
print("Exception:", error)
if __name__ == "__main__":
asyncio.run(main())
Subscribe to tasks
Use the
ListenAsAgent
action to let your asset subscribe to tasks that Task Manager routes to it and
monitor for task updates.
If a task is assigned to an agent that is not monitoring this endpoint, the task will fail to send.
For example, the following snippet imports the Task Manager API and streams
tasks for the entityIds
listed in the request, to be executed by the agent:
- Python
channel = Channel(host=lattice_url, port=443, ssl=True)
stub = TaskManagerApiStub(channel)
request = ListenAsAgentRequest(
entity_ids=EntityIds(
entity_ids=["$AGENT_ENTITY_ID"]
)
)
response = stub.listen_as_agent(request, metadata=metadata)
channel.close()
app.py
from anduril.taskmanager.v1 import ListenAsAgentRequest, EntityIds, TaskManagerApiStub
from grpclib.client import Channel
import asyncio
import os
import sys
# Retrieve your environment token and Lattice URL from environment variables.
lattice_url = os.getenv('LATTICE_URL')
environment_token = os.getenv('ENVIRONMENT_TOKEN')
# Add the following statement to retrieve the sandboxes token
# if you are developing on Lattice Sandboxes.
#
# sandboxes_token = os.getenv('SANDBOXES_TOKEN')
if not environment_token or not lattice_url:
print("Make sure your Lattice URL and bearer token have been set as system environment variables.")
sys.exit(1)
# Add the following statement if you are developing on Lattice Sandboxes.
# if not sandboxes_token:
# print("Make sure your sandboxes token has been set as system environment variables.")
# sys.exit(1)
metadata = {
'authorization': f"Bearer {environment_token}",
# Add the following statement if you are developing on Lattice Sandboxes.
# 'anduril-sandbox-authorization': f"Bearer {sandboxes_token}"
}
async def listen_as_agent():
try:
channel = Channel(host=lattice_url, port=443, ssl=True)
stub = TaskManagerApiStub(channel)
request = ListenAsAgentRequest(
entity_ids=EntityIds(
entity_ids=["$AGENT_ENTITY_ID"]
)
)
requests = stub.listen_as_agent(request, metadata=metadata)
channel.close()
async for request in requests:
if request.execute_request:
print(f"Executing Request: {request.execute_request.task.description}")
elif request.complete_request:
print(f"Completing task: {request.complete_request.task_id}")
elif request.cancel_request:
print(f"Cancelling task: {request.cancel_request.task_id}")
except Exception as error:
print(f"Exception: {error}")
if __name__ == "__main__":
raise SystemExit(asyncio.run(listen_as_agent()))
Agents should gracefully handle stream interruptions. If the connection to Task Manager fails, the stream will close. Your system should handle potential stream errors and retry the connection to Task Manager when necessary. This ensures the agent maintains its subscription to task updates, even during transient network or service interruptions.
Update task status
The agent working on the task uses the
UpdateStatus
API action
to communicate any progress it has made on the assugned task back to Lattice. As the agent moves each task
through its lifecycle, Task Manager updates the common operating picture.
For example, the following imports the Task Manager API and sets the task status to EXECUTING
:
- Python
channel = Channel(host=lattice_url, port=443, ssl=True)
request = UpdateStatusRequest(
status_update=StatusUpdate(
version=TaskVersion(
task_id="$TASK_ID",
definition_version=1,
status_version=1
),
status=TaskStatus(
status=Status.EXECUTING
),
author=Principal(
system=System(
entity_id="$AGENT_ENTITY_ID"
)
)
)
)
response = stub.update_status(request, metadata=metadata)
channel.close()
app.py
from anduril.taskmanager.v1 import *
from grpclib.client import Channel
import asyncio
import os
import sys
# Retrieve your environment token and Lattice URL from environment variables.
lattice_url = os.getenv('LATTICE_URL')
environment_token = os.getenv('ENVIRONMENT_TOKEN')
# Retrieve the sandboxes token if you are developing on Lattice Sandboxes.
# sandboxes_token = os.getenv('SANDBOXES_TOKEN')
if not environment_token or not lattice_url:
print("Make sure your Lattice URL and bearer token have been set as system environment variables.")
sys.exit(1)
# Add the following statement if you are developing on Lattice Sandboxes.
# if not sandboxes_token:
# print("Make sure your sandboxes token has been set as system environment variables.")
# sys.exit(1)
metadata = {
'authorization': f"Bearer {environment_token}",
# Add the following statement if you are developing on Lattice Sandboxes.
# 'anduril-sandbox-authorization': f"Bearer {sandboxes_token}"
}
async def listen_as_agent(entityId):
channel = Channel(host=lattice_url, port=443, ssl=True)
stub = TaskManagerApiStub(channel)
request = ListenAsAgentRequest(
entity_ids=EntityIds(
entity_ids=[entityId]
)
)
response = stub.listen_as_agent(request, metadata=metadata)
channel.close()
return response
async def execute_task(task_id, task_defintion_version, task_status_version):
channel = Channel(host=lattice_url, port=443, ssl=True)
stub = TaskManagerApiStub(channel)
request = UpdateStatusRequest(
status_update=StatusUpdate(
version=TaskVersion(
task_id=task_id,
definition_version=task_defintion_version,
status_version=task_status_version
),
status=TaskStatus(
status=Status.EXECUTING
),
author=Principal(
system=System(
entity_id="$AGENT_ENTITY_ID"
)
)
)
)
response = stub.update_status(request, metadata=metadata)
channel.close()
return response
async def main():
while True:
try:
# Replace $AGENT_ENTITY_ID with the ID of the asset working on
# your task.
agent_request = await listen_as_agent("$AGENT_ENTITY_ID")
async for request in agent_request:
if request.execute_request:
# Parse the execute_request object.
task_id = request.execute_request.task.version.task_id
task_defintion_version = request.execute_request.task.version.definition_version
task_status_version = request.execute_request.task.version.status_version
print(f"Updating task {task_id} definition version {task_defintion_version}, and status version {task_status_version}...")
response = await execute_task(task_id, task_defintion_version, task_status_version)
print(f"Tast status update response: {response}")
elif request.complete_request:
print(f"Completing task: {request.complete_request.task_id}")
elif request.cancel_request:
print(f"Cancelling task:, {request.cancel_request.task_id}")
except Exception as error:
print("Exception:", error)
if __name__ == "__main__":
raise SystemExit(asyncio.run(main()))
Endpoints for operators
The operator should use the following endpoints from Lattice UI or a third-party UI:
CreateTask
: creates a new task. Lattice calls theCreateTask
endpoint after a task's details have been populated in the UI, and an operator presses "Execute Task" for the first time.GetTask
: gets the state of an existing task.QueryTasks
: find tasks that match request criteria.
What's next
- To learn more about tasking, see Task an asset in in the Lattice SDK sample applications guide.