Watch entities
This page explains getting and streaming a real-time feed of Lattice entities. For more information about other API actions that let you receive entity data, see (gRPC | HTTP) and Stream Entity (gRPC | HTTP).
Before you begin
Before you can configure your application to watch entities, set up your Lattice environment.
Get specific entities
To receive real-time data for a single entity, use the following examples:
- Go
- Rust
- C++
- Python
- curl
package main
import (
"context"
"log"
entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// BearerTokenAuth supplies PerRPCCredentials from a given token.
type BearerTokenAuth struct {
Token string
}
// GetRequestMetadata gets the current request metadata, adding the bearer token.
func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + b.Token,
}, nil
}
// RequireTransportSecurity indicates whether the credentials requires transport security.
func (b *BearerTokenAuth) RequireTransportSecurity() bool {
return true // or false if you are developing/testing without TLS
}
func main(){
ctx := context.Background()
bearerToken := "$YOUR_BEARER_TOKEN"
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient("$YOUR_LATTICE_URL", opts...)
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()
em := entitymanagerv1.NewEntityManagerAPIClient(conn)
entityId := "test.entity.1"
resp, err := em.GetEntity(ctx, &entitymanagerv1.GetEntityRequest{
EntityId: entityId,
})
if err != nil {
log.Fatalf("Error getting entity: %v, %v", entityId, err)
}
log.Printf("Received entity: %v", *resp.GetEntity())
}
use anduril_lattice_sdk::anduril::entitymanager::v1::entity_manager_api_client::EntityManagerApiClient;
use anduril_lattice_sdk::anduril::entitymanager::v1::{GetEntityRequest, GetEntityResponse};
use tonic::metadata::MetadataValue;
use tonic::transport::{ClientTlsConfig, Channel};
use tonic::{Request, Status};
async fn get_entity() -> Result<GetEntityResponse, Status>{
let token = "$YOUR_BEARER_TOKEN";
let bearer_token = format!("Bearer {}", token);
let header_value: MetadataValue<_> = bearer_token.parse().map_err(|_| Status::internal("Invalid Bearer Token"))?;
let http_endpoint = format!("$YOUR_LATTICE_URL");
let registration_channel = Channel::from_shared(http_endpoint)
.map_err(|e| Status::invalid_argument(format!("Invalid HTTP endpoint: {}", e)))?
.tls_config(ClientTlsConfig::new().with_native_roots())
.map_err(|_| Status::internal("TLS configuration failed"))?
.connect()
.await
.map_err(|e| Status::unavailable(format!("Failed to connect: {}", e)))?;
let mut em_client = EntityManagerApiClient::with_interceptor(registration_channel, |mut req: Request<()>| {
req.metadata_mut().insert("authorization", header_value.clone());
Ok(req)
});
let response = em_client.get_entity(GetEntityRequest {
entity_id: String::from("$ENTITY_ID")
}).await?;
Ok(response.into_inner())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let entity_result = get_entity().await;
match entity_result {
Ok(entity) => println!("{:?}", entity),
Err(e) => eprintln!("Error: {}", e),
}
Ok(())
}
#include <grpc/grpc.h>
#include <grpcpp/channel.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <anduril/entitymanager/v1/entity_manager_api.pub.grpc.pb.h>
int main(int argc, char *argv[]) {
GOOGLE_PROTOBUF_VERIFY_VERSION;
auto url = $YOUR_LATTICE_URL;
grpc::ClientContext ctx;
// Setting custom metadata to be sent to the server
ctx.AddMetadata("authorization", "Bearer $YOUR_BEARER_TOKEN");
auto creds = grpc::SslCredentials(grpc::SslCredentialsOptions());
std::shared_ptr<anduril::entitymanager::v1::EntityManagerAPI::Stub> mStub =
anduril::entitymanager::v1::EntityManagerAPI::NewStub(grpc::CreateChannel(url, creds));
anduril::entitymanager::v1::GetEntityRequest req;
req.set_entity_id("$ENTITY_ID");
anduril::entitymanager::v1::GetEntityResponse res;
grpc::Status status = mStub->GetEntity(&ctx, req, &res);
if(!status.ok()) {
std::cerr << "Error code: " << status.error_code() << std::endl;
std::cerr << "Error message: " << status.error_message() << std::endl;
} else {
std::cout << res.entity().DebugString() << std::endl;
}
return 0;
}
from anduril.entitymanager.v1 import EntityManagerApiStub, GetEntityRequest
from grpclib.client import Channel
import asyncio
# set authentication
metadata = {
'authorization': 'Bearer $YOUR_BEARER_TOKEN'
}
# Get an entity's information
async def get_entity(entity_id):
# open secure channel
channel = Channel(host="$YOUR_LATTICE_URL", port=443, ssl=True)
# create service instance
entity_manager_stub = EntityManagerApiStub(channel)
# get the entity
response = await entity_manager_stub.get_entity(GetEntityRequest(entity_id=entity_id), metadata=metadata)
# close the channel
channel.close()
return response
if __name__ == "__main__":
raise SystemExit(print(asyncio.run(get_entity("test.entity.1")).entity)) # can replace with id of any entity in the UI
Use curl
in your command line tool as shown in the following:
curl -L -X GET 'https://$YOUR_LATTICE_URL/api/v1/entities/$ENTITY_ID' \
-H 'Authorization: Bearer $YOUR_BEARER_TOKEN' \
-H 'Accept: application/json'
Replace $ENTITY_ID with your entity's unique ID.
Stream entities
If you need to watch a real-time feed of entities, then you can use the
StreamEntityComponents
API. This API delivers a stream of entity events as the
state of the common operational picture changes.
- Go
- Rust
- Python
- curl
package main
import (
"context"
"log"
entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// BearerTokenAuth supplies PerRPCCredentials from a given token.
type BearerTokenAuth struct {
Token string
}
// GetRequestMetadata gets the current request metadata, adding the bearer token.
func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + b.Token,
}, nil
}
// RequireTransportSecurity indicates whether the credentials requires transport security.
func (b *BearerTokenAuth) RequireTransportSecurity() bool {
return true // or false if you are developing/testing without TLS
}
func main() {
ctx := context.Background()
bearerToken := "$YOUR_BEARER_TOKEN"
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient("$YOUR_LATTICE_URL", opts...)
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()
em := entitymanagerv1.NewEntityManagerAPIClient(conn)
//Start stream for all Entity Events including all components in the entity
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
IncludeAllComponents: true,
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}
for {
//Receive single entity event response
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received entity event: %v", resp.GetEntityEvent())
}
}
use anduril_lattice_sdk::anduril::entitymanager::v1::entity_manager_api_client::EntityManagerApiClient;
use anduril_lattice_sdk::anduril::entitymanager::v1::{GetEntityRequest, GetEntityResponse};
use tonic::metadata::MetadataValue;
use tonic::transport::{ClientTlsConfig, Channel};
use tonic::{Request, Status};
async fn stream_entities() -> Result<(), Box<dyn std::error::Error>>{
let token = "$YOUR_BEARER_TOKEN";
let bearer_token = format!("Bearer {}", token);
let header_value: MetadataValue<_> = bearer_token.parse().map_err(|_| Status::internal("Invalid Bearer Token"))?;
let http_endpoint = format!("$YOUR_LATTICE_URL");
let registration_channel = Channel::from_shared(http_endpoint)
.map_err(|e| Status::invalid_argument(format!("Invalid HTTP endpoint: {}", e)))?
.tls_config(ClientTlsConfig::new().with_native_roots())
.map_err(|_| Status::internal("TLS configuration failed"))?
.connect()
.await
.map_err(|e| Status::unavailable(format!("Failed to connect: {}", e)))?;
let mut em_client = EntityManagerApiClient::with_interceptor(registration_channel, |mut req: Request<()>| {
req.metadata_mut().insert("authorization", header_value.clone());
Ok(req)
});
let mut stream = em_client.stream_entity_components(StreamEntityComponentsRequest{
components_to_include: vec![],
heartbeat_period_millis: 250,
include_all_components: true,
filter: None,
rate_limit: None,
preexisting_only: false,
}).await?.into_inner();
while let Some(resp) = stream.message().await? {
let resp: StreamEntityComponentsResponse = resp;
println!("Received event: {:?}", resp);
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream = stream_entities().await;
Ok(stream?)
}
from anduril.entitymanager.v1 import EntityManagerApiStub, StreamEntityComponentsRequest
from grpclib.client import Channel
import asyncio
# set authentication
metadata = {
'authorization': 'Bearer $YOUR_BEARER_TOKEN'
}
async def stream_entities():
# open secure channel
channel = Channel(host="$YOUR_LATTICE_URL", port=443, ssl=True)
# create service instance
entity_manager_stub = EntityManagerApiStub(channel)
# get the entity
stream = entity_manager_stub.stream_entity_components(
StreamEntityComponentsRequest(include_all_components=True),
metadata=metadata
)
async for event in stream:
print(event.entity_event.entity)
if __name__ == "__main__":
raise SystemExit(asyncio.run(stream_entities()))
curl -L -X POST 'https://$YOUR_LATTICE_URL/api/v1/entities/events' \
-H 'Authorization: Bearer $YOUR_BEARER_TOKEN' \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
--data-raw '{ "includeAllComponents": true }'
Stream specific components
You can specify a subset of components to stream from the entity
object by
providing the component in snake_case
. The StreamEntityComponents
API will
then only stream changes to those components:
- Go
- Rust
Use Golang lattice SDK to stream entities containing only location_uncertainty
& power_state
components included,
the caller can expect the stream to only produce events if and only if these components changed.
package main
import (
"context"
"log"
entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// BearerTokenAuth supplies PerRPCCredentials from a given token.
type BearerTokenAuth struct {
Token string
}
// GetRequestMetadata gets the current request metadata, adding the bearer token.
func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + b.Token,
}, nil
}
// RequireTransportSecurity indicates whether the credentials requires transport security.
func (b *BearerTokenAuth) RequireTransportSecurity() bool {
return true // or false if you are developing/testing without TLS
}
func main() {
ctx := context.Background()
bearerToken := "$YOUR_BEARER_TOKEN"
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient("$YOUR_LATTICE_URL", opts...)
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()
em := entitymanagerv1.NewEntityManagerAPIClient(conn)
//Start stream for Entity Events containing a change in the following components: location_uncertainty, power_state
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
ComponentsToInclude: []string{"location_uncertainty", "power_state"},
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}
for {
//Receive single entity event response
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received entity event: %v", resp.GetEntityEvent())
}
}
Use Rust lattice SDK to stream entities containing only location_uncertainty
& aliases
components included,
the caller can expect the stream to only produce events if and only if these components changed.
use anduril_lattice_sdk::anduril::entitymanager::v1::entity_manager_api_client::EntityManagerApiClient;
use anduril_lattice_sdk::anduril::entitymanager::v1::{GetEntityRequest, GetEntityResponse};
use tonic::metadata::MetadataValue;
use tonic::transport::{ClientTlsConfig, Channel};
use tonic::{Request, Status};
async fn stream_entities_subcomponents() -> Result<(), Box<dyn std::error::Error>>{
let token = "$YOUR_BEARER_TOKEN";
let bearer_token = format!("Bearer {}", token);
let header_value: MetadataValue<_> = bearer_token.parse().map_err(|_| Status::internal("Invalid Bearer Token"))?;
let http_endpoint = format!("$YOUR_LATTICE_URL");
let registration_channel = Channel::from_shared(http_endpoint)
.map_err(|e| Status::invalid_argument(format!("Invalid HTTP endpoint: {}", e)))?
.tls_config(ClientTlsConfig::new().with_native_roots())
.map_err(|_| Status::internal("TLS configuration failed"))?
.connect()
.await
.map_err(|e| Status::unavailable(format!("Failed to connect: {}", e)))?;
let mut em_client = EntityManagerApiClient::with_interceptor(registration_channel, |mut req: Request<()>| {
req.metadata_mut().insert("authorization", header_value.clone());
Ok(req)
});
let mut stream = em_client.stream_entity_components(StreamEntityComponentsRequest{
components_to_include: vec!["location_uncertainty".to_string(), "aliases".to_string()],
heartbeat_period_millis: 250,
include_all_components: false,
filter: None,
rate_limit: None,
preexisting_only: false,
}).await?.into_inner();
while let Some(resp) = stream.message().await? {
let resp: StreamEntityComponentsResponse = resp;
println!("Received event: {:?}", resp);
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream = stream_entities_subcomponents().await;
Ok(stream?)
}
Apply filtering
This feature is currently only available using Lattice SDKs for gRPC.
The StreamEntityComponents
API allows for any field on the
entity
component to be filtered
against.
Here's an example of an OR
predicate filter, it matches an entity if
mil_view.disposition
is SUSPICIOUS
or HOSTILE
.
Note, disposition is an enum so the json value is just a number.
{
"Operation": {
"Or": {
"Children": {
"PredicateSet": {
"predicates": [
{
"field_path": "mil_view.disposition",
"value": {
"Type": {
"EnumType": {
"value": 2
}
}
},
"comparator": 1
},
{
"field_path": "mil_view.disposition",
"value": {
"Type": {
"EnumType": {
"value": 3
}
}
},
"comparator": 1
}
]
}
}
}
}
}
Using the above filter definition in Go, we can stream entities that match this filter.
package main
import (
"context"
"log"
entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// BearerTokenAuth supplies PerRPCCredentials from a given token.
type BearerTokenAuth struct {
Token string
}
// GetRequestMetadata gets the current request metadata, adding the bearer token.
func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + b.Token,
}, nil
}
// RequireTransportSecurity indicates whether the credentials requires transport security.
func (b *BearerTokenAuth) RequireTransportSecurity() bool {
return true // or false if you are developing/testing without TLS
}
func main(){
ctx := context.Background()
bearerToken := "$YOUR_BEARER_TOKEN"
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient("$YOUR_LATTICE_URL", opts...)
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()
em := entitymanagerv1.NewEntityManagerAPIClient(conn)
// Filter Statement that defines 2 OR predicates that filter suspicious or
// hostile entities
filter := &entitymanagerv1.Statement{
Operation: &entitymanagerv1.Statement_Or{
Or: &entitymanagerv1.OrOperation{
Children: &entitymanagerv1.OrOperation_PredicateSet{
PredicateSet: &entitymanagerv1.PredicateSet{
Predicates: []*entitymanagerv1.Predicate{
{
FieldPath: "mil_view.disposition",
Value: &entitymanagerv1.Value{
Type: &entitymanagerv1.Value_EnumType{
EnumType: &entitymanagerv1.EnumType{
Value: int32(ontologyv1.Disposition_DISPOSITION_HOSTILE),
},
},
},
Comparator: entitymanagerv1.Comparator_COMPARATOR_EQUALITY,
},
{
FieldPath: "mil_view.disposition",
Value: &entitymanagerv1.Value{
Type: &entitymanagerv1.Value_EnumType{
EnumType: &entitymanagerv1.EnumType{
Value: int32(ontologyv1.Disposition_DISPOSITION_SUSPICIOUS),
},
},
},
Comparator: entitymanagerv1.Comparator_COMPARATOR_EQUALITY,
},
},
},
},
},
},
}
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
Filter: filter,
IncludeAllComponents: true,
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}
for {
// Receive single entity event response
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received filtered entity event: %v", resp.GetEntityEvent())
}
}