Skip to main content

Integrate with Lattice Tasking

This page describes how to integrate with Task Manager, the service that lets you create, update, monitor, and execute a task. For reference see TaskManager (gRPC | HTTP).

In an integration with Lattice, operators and taskable agents interact with Task Manager to progress a task through its full lifecycle. Task Manager handles task state management and the correct routing of tasks to taskable agents.

If you're integrating an agent with Lattice's tasking infrastructure, your robot should only use the following endpoints:

  • ListenAsAgent: To monitor for, and receive, tasks.
  • UpdateStatus: To report real-time status updates to Task Manager as the agent makes progress through a task.

Before you begin

  • To publish taskable entities, and subscribe to tasks, set up your Lattice environment.
  • Familiarize yourself with entities and different entity types.

Define tasks for an agent

An agent is an entity, typically an asset, or group of entities, that can complete specific tasks. To let Lattice know your agent can complete specific tasks, add the TaskDefinitions in the entity's TaskCatalog component.

Note

Lattice will not send a task to an agent for execution unless the task is listed in the agent's TaskCatalog.

The TaskCatalog component specifies the types of tasks an agent can perform. In the following example, the agent can execute:

  • A VisualId task, which might involve identifying or tracking visual targets.

  • An Investigate task, which might involve reconnaissance or surveillance.

    "taskCatalog": {
    "taskDefinitions": [
    {
    "taskSpecificationUrl": "type.googleapis.com/anduril.tasks.v2.VisualId"
    },
    {
    "taskSpecificationUrl": "type.googleapis.com/anduril.tasks.v2.Investigate"
    }
    ]
    }

Each task definition requires a taskSpecificationUrl, which is a URL that uniquely identifies the task's protobuf message type.

Note

You will need to work with your Anduril representative to determine an appropriate task specification.

Publish asset

To configure a taskable asset, do the following:

  1. Import TaskCatalog and TaskDefinition:
app.py
from anduril.tasks.v2 import TaskCatalog, TaskDefinition
  1. Add the TaskCatalog component to the entity when using the PublishEntity API:
app.py
# Define the asset to publish.
entity = Entity(
entity_id = id,
created_time=creation_time,
aliases=Aliases(n ame="Simulated Drone"),
expiry_time=timenow + timedelta(minutes=10),
mil_view=MilView(disposition=Disposition.FRIENDLY),
location=Location(
position=Position(
# Move the entity in a circle.
latitude_degrees=33.69447852698943 + (radius_degrees * math.cos(t)),
longitude_degrees=-117.9173785693163 + (radius_degrees * math.sin(t)))),
ontology=Ontology(
template=Template.ASSET,
platform_type="UAV"
),
# Define a task catalog for the asset. Each task definition represents a task that
# the asset can perform.
task_catalog=TaskCatalog(
task_definitions=[
TaskDefinition(
task_specification_url="type.googleapis.com/anduril.tasks.v2.VisualId"
),
TaskDefinition(
task_specification_url="type.googleapis.com/anduril.tasks.v2.Investigate"
)
]
),
provenance=Provenance(
integration_name="your_integration",
data_type="your_data_type",
source_update_time=timenow),
is_live=True)
app.py
from anduril.entitymanager.v1 import *
from anduril.ontology.v1 import Disposition
from anduril.tasks.v2 import TaskCatalog, TaskDefinition

from grpclib.client import Channel


from uuid import uuid4
from datetime import datetime, timezone, timedelta
import asyncio
import math
import os
import sys

# Retrieve your environment token and Lattice URL from environment variables.
lattice_url = os.getenv('LATTICE_URL')
environment_token = os.getenv('ENVIRONMENT_TOKEN')

# Add the following statement to retrieve the sandboxes token
# if you are developing on Lattice Sandboxes.
#
# sandboxes_token = os.getenv('SANDBOXES_TOKEN')

if not environment_token or not lattice_url:
print("Make sure your Lattice URL and bearer token have been set as system environment variables.")
sys.exit(1)

# Add the following statement if you are developing on Lattice Sandboxes.
# if not sandboxes_token:
# print("Make sure your sandboxes token has been set as system environment variables.")
# sys.exit(1)

metadata = {
'authorization': f"Bearer {environment_token}",
# Add the following statement if you are developing on Lattice Sandboxes.
# 'anduril-sandbox-authorization': f"Bearer {sandboxes_token}"
}
async def publish_entity(entity):
channel = Channel(host=lattice_url, port=443, ssl=True)
stub = EntityManagerApiStub(channel)

request = PublishEntityRequest(entity=entity)
response = await stub.publish_entity(request, metadata=metadata)

channel.close()
return response

async def main():
try:
count = 0
radius_degrees = .1
creation_time = datetime.now(timezone.utc)

timenow = datetime.now(timezone.utc)
count += .1
t = math.radians(count)
id = str(uuid4())

print(f"Publishing simulated asset: {id}")

while (True):
timenow = datetime.now(timezone.utc)
count += .1
t = math.radians(count)

entity = Entity(
entity_id = id,
created_time=creation_time,
aliases=Aliases(name="Simulated Drone"),
expiry_time=timenow + timedelta(minutes=10),
mil_view=MilView(disposition=Disposition.FRIENDLY),
location=Location(
position=Position(
# Move the entity in a circle.
latitude_degrees=33.69447852698943 + (radius_degrees * math.cos(t)),
longitude_degrees=-117.9173785693163 + (radius_degrees * math.sin(t)))),
ontology=Ontology(
template=Template.ASSET,
platform_type="UAV"
),
# Define a task catalog for the asset. Each task definition represents a task that
# the asset can perform.
task_catalog=TaskCatalog(
task_definitions=[
TaskDefinition(
task_specification_url="type.googleapis.com/anduril.tasks.v2.VisualId"
),
TaskDefinition(
task_specification_url="type.googleapis.com/anduril.tasks.v2.Investigate"
)
]
),
provenance=Provenance(
integration_name="your_integration",
data_type="your_data_type",
source_update_time=timenow),
is_live=True)

await publish_entity(entity)

await asyncio.sleep(.01)

except Exception as error:
print("Exception:", error)

if __name__ == "__main__":
asyncio.run(main())

Subscribe to tasks

Use the ListenAsAgent action to let your asset subscribe to tasks that Task Manager routes to it and monitor for task updates.

If a task is assigned to an agent that is not monitoring this endpoint, the task will fail to send.

For example, the following snippet imports the Task Manager API and streams tasks for the entityIds listed in the request, to be executed by the agent:

app.py
channel = Channel(host=lattice_url, port=443, ssl=True)
stub = TaskManagerApiStub(channel)
request = ListenAsAgentRequest(
entity_ids=EntityIds(
entity_ids=["$AGENT_ENTITY_ID"]
)
)
response = stub.listen_as_agent(request, metadata=metadata)
channel.close()
app.py
app,py
from anduril.taskmanager.v1 import ListenAsAgentRequest, EntityIds, TaskManagerApiStub
from grpclib.client import Channel

import asyncio
import os
import sys

# Retrieve your environment token and Lattice URL from environment variables.
lattice_url = os.getenv('LATTICE_URL')
environment_token = os.getenv('ENVIRONMENT_TOKEN')

# Add the following statement to retrieve the sandboxes token
# if you are developing on Lattice Sandboxes.
#
# sandboxes_token = os.getenv('SANDBOXES_TOKEN')

if not environment_token or not lattice_url:
print("Make sure your Lattice URL and bearer token have been set as system environment variables.")
sys.exit(1)

# Add the following statement if you are developing on Lattice Sandboxes.
# if not sandboxes_token:
# print("Make sure your sandboxes token has been set as system environment variables.")
# sys.exit(1)

metadata = {
'authorization': f"Bearer {environment_token}",
# Add the following statement if you are developing on Lattice Sandboxes.
# 'anduril-sandbox-authorization': f"Bearer {sandboxes_token}"
}

async def listen_as_agent():
try:
channel = Channel(host=lattice_url, port=443, ssl=True)
stub = TaskManagerApiStub(channel)
request = ListenAsAgentRequest(
entity_ids=EntityIds(
entity_ids=["$AGENT_ENTITY_ID"]
)
)
requests = stub.listen_as_agent(request, metadata=metadata)
channel.close()

async for request in requests:
if request.execute_request:
print(f"Executing Request: {request.execute_request.task.description}")
elif request.complete_request:
print(f"Completing task: {request.complete_request.task_id}")
elif request.cancel_request:
print(f"Cancelling task: {request.cancel_request.task_id}")
except Exception as error:
print(f"Exception: {error}")

if __name__ == "__main__":
raise SystemExit(asyncio.run(listen_as_agent()))
warning

Agents should gracefully handle stream interruptions. If the connection to Task Manager fails, the stream will close. Your system should handle potential stream errors and retry the connection to Task Manager when necessary. This ensures the agent maintains its subscription to task updates, even during transient network or service interruptions.

Update task status

The agent working on the task uses the UpdateStatus API action to communicate any progress it has made on the assugned task back to Lattice. As the agent moves each task through its lifecycle, Task Manager updates the common operating picture.

For example, the following imports the Task Manager API and sets the task status to EXECUTING:

app.py
channel = Channel(host=lattice_url, port=443, ssl=True)
request = UpdateStatusRequest(
status_update=StatusUpdate(
version=TaskVersion(
task_id="$TASK_ID",
definition_version=1,
status_version=1
),
status=TaskStatus(
status=Status.EXECUTING
),
author=Principal(
system=System(
entity_id="$AGENT_ENTITY_ID"
)
)
)
)
response = stub.update_status(request, metadata=metadata)
channel.close()
app.py
app,py
from anduril.taskmanager.v1 import *
from grpclib.client import Channel

import asyncio
import os
import sys

# Retrieve your environment token and Lattice URL from environment variables.
lattice_url = os.getenv('LATTICE_URL')
environment_token = os.getenv('ENVIRONMENT_TOKEN')

# Retrieve the sandboxes token if you are developing on Lattice Sandboxes.
# sandboxes_token = os.getenv('SANDBOXES_TOKEN')

if not environment_token or not lattice_url:
print("Make sure your Lattice URL and bearer token have been set as system environment variables.")
sys.exit(1)

# Add the following statement if you are developing on Lattice Sandboxes.
# if not sandboxes_token:
# print("Make sure your sandboxes token has been set as system environment variables.")
# sys.exit(1)


metadata = {
'authorization': f"Bearer {environment_token}",
# Add the following statement if you are developing on Lattice Sandboxes.
# 'anduril-sandbox-authorization': f"Bearer {sandboxes_token}"
}

async def listen_as_agent(entityId):
channel = Channel(host=lattice_url, port=443, ssl=True)
stub = TaskManagerApiStub(channel)
request = ListenAsAgentRequest(
entity_ids=EntityIds(
entity_ids=[entityId]
)
)
response = stub.listen_as_agent(request, metadata=metadata)
channel.close()
return response

async def execute_task(task_id, task_defintion_version, task_status_version):
channel = Channel(host=lattice_url, port=443, ssl=True)
stub = TaskManagerApiStub(channel)
request = UpdateStatusRequest(
status_update=StatusUpdate(
version=TaskVersion(
task_id=task_id,
definition_version=task_defintion_version,
status_version=task_status_version
),
status=TaskStatus(
status=Status.EXECUTING
),
author=Principal(
system=System(
entity_id="$AGENT_ENTITY_ID"
)
)
)
)
response = stub.update_status(request, metadata=metadata)
channel.close()
return response

async def main():
while True:
try:
# Replace $AGENT_ENTITY_ID with the ID of the asset working on
# your task.
agent_request = await listen_as_agent("$AGENT_ENTITY_ID")
async for request in agent_request:
if request.execute_request:
# Parse the execute_request object.
task_id = request.execute_request.task.version.task_id
task_defintion_version = request.execute_request.task.version.definition_version
task_status_version = request.execute_request.task.version.status_version

print(f"Updating task {task_id} definition version {task_defintion_version}, and status version {task_status_version}...")
response = await execute_task(task_id, task_defintion_version, task_status_version)
print(f"Tast status update response: {response}")

elif request.complete_request:
print(f"Completing task: {request.complete_request.task_id}")
elif request.cancel_request:
print(f"Cancelling task:, {request.cancel_request.task_id}")
except Exception as error:
print("Exception:", error)

if __name__ == "__main__":
raise SystemExit(asyncio.run(main()))

Endpoints for operators

The operator should use the following endpoints from Lattice UI or a third-party UI:

  • CreateTask: creates a new task. Lattice calls the CreateTask endpoint after a task's details have been populated in the UI, and an operator presses "Execute Task" for the first time.
  • GetTask: gets the state of an existing task.
  • QueryTasks: find tasks that match request criteria.

What's next

  • To learn more about tasking, see Task an asset in in the Lattice SDK sample applications guide.