Skip to main content

Watch entities

This page explains getting and streaming a real-time feed of Lattice entities. For more information about other API actions that let you receive entity data, see (gRPC | HTTP) and Stream Entity (gRPC | HTTP).

Before you begin

Before you can configure your application to watch entities, set up your Lattice environment.

Get specific entities

To receive real-time data for a single entity, use the following examples:

package main
import (
"context"
"log"

entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// BearerTokenAuth supplies PerRPCCredentials from a given token.
type BearerTokenAuth struct {
Token string
}

// GetRequestMetadata gets the current request metadata, adding the bearer token.
func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + b.Token,
}, nil
}

// RequireTransportSecurity indicates whether the credentials requires transport security.
func (b *BearerTokenAuth) RequireTransportSecurity() bool {
return true // or false if you are developing/testing without TLS
}

func main(){
ctx := context.Background()

bearerToken := "$YOUR_BEARER_TOKEN"
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient("$YOUR_LATTICE_URL", opts...)
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()

em := entitymanagerv1.NewEntityManagerAPIClient(conn)

entityId := "test.entity.1"
resp, err := em.GetEntity(ctx, &entitymanagerv1.GetEntityRequest{
EntityId: entityId,
})
if err != nil {
log.Fatalf("Error getting entity: %v, %v", entityId, err)
}
log.Printf("Received entity: %v", *resp.GetEntity())
}

Stream entities

If you need to watch a real-time feed of entities, then you can use the StreamEntityComponents API. This API delivers a stream of entity events as the state of the common operational picture changes.

package main
import (
"context"
"log"

entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// BearerTokenAuth supplies PerRPCCredentials from a given token.
type BearerTokenAuth struct {
Token string
}

// GetRequestMetadata gets the current request metadata, adding the bearer token.
func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + b.Token,
}, nil
}

// RequireTransportSecurity indicates whether the credentials requires transport security.
func (b *BearerTokenAuth) RequireTransportSecurity() bool {
return true // or false if you are developing/testing without TLS
}

func main() {
ctx := context.Background()

bearerToken := "$YOUR_BEARER_TOKEN"
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient("$YOUR_LATTICE_URL", opts...)
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()

em := entitymanagerv1.NewEntityManagerAPIClient(conn)

//Start stream for all Entity Events including all components in the entity
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
IncludeAllComponents: true,
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}
for {
//Receive single entity event response
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received entity event: %v", resp.GetEntityEvent())
}
}

Stream specific components

You can specify a subset of components to stream from the entity object by providing the component in snake_case. The StreamEntityComponents API will then only stream changes to those components:

Use Golang lattice SDK to stream entities containing only location_uncertainty & power_state components included, the caller can expect the stream to only produce events if and only if these components changed.

package main
import (
"context"
"log"

entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// BearerTokenAuth supplies PerRPCCredentials from a given token.
type BearerTokenAuth struct {
Token string
}

// GetRequestMetadata gets the current request metadata, adding the bearer token.
func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + b.Token,
}, nil
}

// RequireTransportSecurity indicates whether the credentials requires transport security.
func (b *BearerTokenAuth) RequireTransportSecurity() bool {
return true // or false if you are developing/testing without TLS
}

func main() {
ctx := context.Background()

bearerToken := "$YOUR_BEARER_TOKEN"
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient("$YOUR_LATTICE_URL", opts...)
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()

em := entitymanagerv1.NewEntityManagerAPIClient(conn)

//Start stream for Entity Events containing a change in the following components: location_uncertainty, power_state
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
ComponentsToInclude: []string{"location_uncertainty", "power_state"},
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}
for {
//Receive single entity event response
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received entity event: %v", resp.GetEntityEvent())
}
}

Apply filtering

note

This feature is currently only available using Lattice SDKs for gRPC.

The StreamEntityComponents API allows for any field on the entity component to be filtered against.

Here's an example of an OR predicate filter, it matches an entity if mil_view.disposition is SUSPICIOUS or HOSTILE.

Note, disposition is an enum so the json value is just a number.

{
"Operation": {
"Or": {
"Children": {
"PredicateSet": {
"predicates": [
{
"field_path": "mil_view.disposition",
"value": {
"Type": {
"EnumType": {
"value": 2
}
}
},
"comparator": 1
},
{
"field_path": "mil_view.disposition",
"value": {
"Type": {
"EnumType": {
"value": 3
}
}
},
"comparator": 1
}
]
}
}
}
}
}

Using the above filter definition in Go, we can stream entities that match this filter.

package main
import (
"context"
"log"

entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// BearerTokenAuth supplies PerRPCCredentials from a given token.
type BearerTokenAuth struct {
Token string
}

// GetRequestMetadata gets the current request metadata, adding the bearer token.
func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + b.Token,
}, nil
}

// RequireTransportSecurity indicates whether the credentials requires transport security.
func (b *BearerTokenAuth) RequireTransportSecurity() bool {
return true // or false if you are developing/testing without TLS
}

func main(){
ctx := context.Background()

bearerToken := "$YOUR_BEARER_TOKEN"
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient("$YOUR_LATTICE_URL", opts...)
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()

em := entitymanagerv1.NewEntityManagerAPIClient(conn)

// Filter Statement that defines 2 OR predicates that filter suspicious or
// hostile entities
filter := &entitymanagerv1.Statement{
Operation: &entitymanagerv1.Statement_Or{
Or: &entitymanagerv1.OrOperation{
Children: &entitymanagerv1.OrOperation_PredicateSet{
PredicateSet: &entitymanagerv1.PredicateSet{
Predicates: []*entitymanagerv1.Predicate{
{
FieldPath: "mil_view.disposition",
Value: &entitymanagerv1.Value{
Type: &entitymanagerv1.Value_EnumType{
EnumType: &entitymanagerv1.EnumType{
Value: int32(ontologyv1.Disposition_DISPOSITION_HOSTILE),
},
},
},
Comparator: entitymanagerv1.Comparator_COMPARATOR_EQUALITY,
},
{
FieldPath: "mil_view.disposition",
Value: &entitymanagerv1.Value{
Type: &entitymanagerv1.Value_EnumType{
EnumType: &entitymanagerv1.EnumType{
Value: int32(ontologyv1.Disposition_DISPOSITION_SUSPICIOUS),
},
},
},
Comparator: entitymanagerv1.Comparator_COMPARATOR_EQUALITY,
},
},
},
},
},
},
}
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
Filter: filter,
IncludeAllComponents: true,
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}
for {
// Receive single entity event response
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received filtered entity event: %v", resp.GetEntityEvent())
}
}