Skip to main content

Watch entities

This page explains getting and streaming information about entities in your environment. You can get real-time data about a single entity, or create a stream continuous data from entities in Lattice

For more information about other API actions that let you receive entity data, see Get Entity (gRPC | HTTP) and Stream Entity (gRPC | HTTP).

Before you begin

To configure your application to watch entities, set up your Lattice environment.

Get an entity

Get details of a specific entity using an entity_ID and the GetEntity request in gRPC or HTTP.

Find the entity ID

Find an entity's entity_id in the Lattice UI:

  1. Open the Lattice UI using your environment URL: $YOUR_LATTICE_URL.
  2. In Lattice, locate the entity you want to watch, and select it.
  3. From the entity toolbar, click ... to open more options, then select Copy Content from the drop down list.
  4. Select Copy URL and copy the URL to your clipboard.
  5. Note the entity at the end of the URL and save it: https://$YOUR_LATTICE_URL.com/c2/entities/$ENTITY_ID.

You'll replace the $ENTITY_ID placeholder in the following example with this entity ID.

Run the application

  1. Copy and paste the following code into two files: main.go and lattice_client.go:

    main.go
    main.go
    package main

    import (
    "context"
    )

    // BearerTokenAuth supplies PerRPCCredentials from a given token.
    type BearerTokenAuth struct {
    Token string
    }

    // GetRequestMetadata gets the current request metadata, adding the bearer token.
    func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
    return map[string]string{
    "authorization": "Bearer " + b.Token,
    }, nil
    }

    // RequireTransportSecurity indicates whether the credentials requires transport security.
    func (b *BearerTokenAuth) RequireTransportSecurity() bool {
    return true // or false if you are developing/testing without TLS
    }

    func main() {
    App()
    }
    lattice_client.go
    lattice_client.go
    package main

    import (
    "context"
    "log"
    "os"

    entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    )

    func App() {
    // Retrieve the bearer token and lattice URL from environment variables.
    bearerToken, ok := os.LookupEnv("BEARER_TOKEN")
    if !ok || bearerToken == "" {
    log.Fatalf("BEARER_TOKEN environment variable is not set or is empty.")
    }

    latticeURL, ok := os.LookupEnv("LATTICE_URL")
    if !ok || latticeURL == "" {
    log.Fatalf("LATTICE_URL environment variable is not set or is empty.")
    }

    opts := []grpc.DialOption{
    grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
    grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
    }
    conn, err := grpc.NewClient(latticeURL, opts...)

    if err != nil {
    log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()

    em := entitymanagerv1.NewEntityManagerAPIClient(conn)

    ctx := context.Background()
    request := &entitymanagerv1.GetEntityRequest{
    EntityId: "$ENTITY_ID", // Replace $ENTITY_ID with any active entity ID.
    }
    response, err := em.GetEntity(ctx, request)

    if err != nil {
    log.Fatalf("Error fetching entity: %v", err)
    }
    log.Printf("Response: %v", response)
    }

    Your project structure should now look similar to the following:

    ├── go.mod
    ├── go.sum
    ├── lattice_client.go
    └── main.go
  2. Clean up your go.mod file and construct a vendor directory to store all your dependencies:

    go mod tidy
    go mod vendor
  3. Build and run the application:

    go build
    go run .

Verify the response

If successful, you'll see the entity represented as a Protobuf message in the response, similar to the following:

entity {
entity_id: "AAAACAAAA1111AAAAAA11A1"
description: "Asset 62b0 from SOURCE_ANDURIL"
is_live: true
created_time {
seconds: 1738266966
nanos: 369166124
}
expiry_time {
seconds: 1738446773
nanos: 690232366
}
status {
platform_activity: "Auto"
}
.
.
.
}

If you see an error message similar to the following, entity not found AAAACAAAA1111AAAAAA11A1, This means you successfully authenticated and connected to Lattice, but an entity with the provided ID does not exist. This is an expected error. If you get any other error, check your code. Make sure that your Lattice environment URL and bearer token are correct.

Allow self-signed certificates

warning

Make sure you understand the implications of using self-signed certificates. This should only be used for testing.

In order to connect to a server that is using a self-signed certificate, replace your gRPC channel declaration as shown in the following:

main.go
// Remove the following statements, and replace with a new transport credential declaration.
.
.
.
- grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
+ grpc.WithTransportCredentials(credentials.NewTLS(
&tls.Config{
InsecureSkipVerify: true,
})),

Stream entities

To get a real-time stream of all entities, use the StreamEntityComponents API:

Use the StreamEntityComponents API action to instantiate the stream. You can create a new file, or replace the existing code in lattice_client.go as shown:

lattice_client.go
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
IncludeAllComponents: true,
})
lattice_client.go
package main

import (
"context"
"log"
"os"

entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

func App() {
// Retrieve the bearer token and lattice URL from environment variables.
bearerToken, ok := os.LookupEnv("BEARER_TOKEN")
if !ok || bearerToken == "" {
log.Fatalf("BEARER_TOKEN environment variable is not set or is empty.")
}

latticeURL, ok := os.LookupEnv("LATTICE_URL")
if !ok || latticeURL == "" {
log.Fatalf("LATTICE_URL environment variable is not set or is empty.")
}
ctx := context.Background()

opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient(latticeURL, opts...)

if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()

em := entitymanagerv1.NewEntityManagerAPIClient(conn)

// Start stream for all Entity Events including all components in the entity.
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
IncludeAllComponents: true,
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}
for {
// Receive single entity event response.
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received entity event: %v", resp.GetEntityEvent())
}
}

Stream specific components

You can specify a subset of components to stream from the entity object by providing the component in snake_case. The StreamEntityComponents API only streams changes to the specified components:

Use Lattice SDK for Go to stream entities containing only the location_uncertainty & power_state components. The resulting stream will produce events when any components change, but only include the specified components:

lattice_client.go
// Start stream for Entity Events containing a change in the following components: location_uncertainty, power_state
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
ComponentsToInclude: []string{"location_uncertainty", "power_state"},
})
lattice_client.go
package main

import (
"context"
"log"
"os"

entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// BearerTokenAuth supplies PerRPCCredentials from a given token.
type BearerTokenAuth struct {
Token string
}

// GetRequestMetadata gets the current request metadata, adding the bearer token.
func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + b.Token,
}, nil
}

// RequireTransportSecurity indicates whether the credentials requires transport security.
func (b *BearerTokenAuth) RequireTransportSecurity() bool {
return true // or false if you are developing/testing without TLS.
}

func main() {
// Retrieve the bearer token and lattice URL from environment variables.
bearerToken, ok := os.LookupEnv("BEARER_TOKEN")
if !ok || bearerToken == "" {
log.Fatalf("BEARER_TOKEN environment variable is not set or is empty.")
}

latticeURL, ok := os.LookupEnv("LATTICE_URL")
if !ok || latticeURL == "" {
log.Fatalf("LATTICE_URL environment variable is not set or is empty.")
}

ctx := context.Background()

opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient(latticeURL, opts...)

if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()

em := entitymanagerv1.NewEntityManagerAPIClient(conn)

// Start stream for events whenever any components change, but include only the following
// in reponse: location_uncertainty, power_state
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
ComponentsToInclude: []string{"location_uncertainty", "power_state"},
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}

for {
// Receive single entity event response
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received entity event: %v", resp.GetEntityEvent())
}
}

Apply filters

note

This feature is only available using Lattice SDKs for gRPC.

The StreamEntityComponents API allows for any field on the entity component to be filtered against.

OR operator

The following is an example of an OR predicate filter. This filter matches an entity if mil_view.disposition is SUSPICIOUS or HOSTILE. The disposition value is an enum, and the JSON value corresponds to a number:

lattice_client.go
package main

import (
"context"
"log"
"os"

entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

func App() {
// Retrieve the bearer token and lattice URL from environment variables
bearerToken, ok := os.LookupEnv("BEARER_TOKEN")
if !ok || bearerToken == "" {
log.Fatalf("BEARER_TOKEN environment variable is not set or is empty.")
}

latticeURL, ok := os.LookupEnv("LATTICE_URL")
if !ok || latticeURL == "" {
log.Fatalf("LATTICE_URL environment variable is not set or is empty.")
}

ctx := context.Background()

opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient(latticeURL, opts...)

if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()

em := entitymanagerv1.NewEntityManagerAPIClient(conn)

// Filter Statement that defines 2 OR predicates that filter suspicious or
// hostile entities.
filter := &entitymanagerv1.Statement{
Operation: &entitymanagerv1.Statement_Or{
Or: &entitymanagerv1.OrOperation{
Children: &entitymanagerv1.OrOperation_PredicateSet{
PredicateSet: &entitymanagerv1.PredicateSet{
Predicates: []*entitymanagerv1.Predicate{
{
FieldPath: "mil_view.disposition",
Value: &entitymanagerv1.Value{
Type: &entitymanagerv1.Value_EnumType{
EnumType: &entitymanagerv1.EnumType{
Value: int32(ontologyv1.Disposition_DISPOSITION_HOSTILE),
},
},
},
Comparator: entitymanagerv1.Comparator_COMPARATOR_EQUALITY,
},
{
FieldPath: "mil_view.disposition",
Value: &entitymanagerv1.Value{
Type: &entitymanagerv1.Value_EnumType{
EnumType: &entitymanagerv1.EnumType{
Value: int32(ontologyv1.Disposition_DISPOSITION_SUSPICIOUS),
},
},
},
Comparator: entitymanagerv1.Comparator_COMPARATOR_EQUALITY,
},
},
},
},
},
},
}
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
Filter: filter,
IncludeAllComponents: true,
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}
for {
// Receive single entity event response.
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received filtered entity event: %v", resp.GetEntityEvent())
}
}

List operator

The following is an example of an List predicate filter. This filter matches all entities being tracked by a specific entity:

EntityManagerClient.java
// You might need to modify this statement depending
// on the location of your file relative to the project.
package org.example;

import com.anduril.entitymanager.v1.Comparator;
import com.anduril.entitymanager.v1.EntityManagerAPIGrpc;
import com.anduril.entitymanager.v1.EntityManagerAPIGrpc.EntityManagerAPIStub;
import com.anduril.entitymanager.v1.ListComparator;
import com.anduril.entitymanager.v1.ListOperation;
import com.anduril.entitymanager.v1.OrOperation;
import com.anduril.entitymanager.v1.Predicate;
import com.anduril.entitymanager.v1.Statement;
import com.anduril.entitymanager.v1.StatementSet;
import com.anduril.entitymanager.v1.StreamEntityComponentsRequest;
import com.anduril.entitymanager.v1.StreamEntityComponentsResponse;
import com.anduril.entitymanager.v1.StringType;
import com.anduril.entitymanager.v1.Value;

import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;

/**
* This class represents a client for interacting with the EntityManager API. It
* uses gRPC to communicate with the server and provides methods for retrieving
* entities.
*/
public class MyEntityManagerClient {

/**
* The stub for the EntityManagerAPI. This is the client-side implementation
* of the API interface.
*/
private EntityManagerAPIStub serviceStub;

/**
* Constructs a new instance of the client. Creates a channel to connect to
* the server using the URL of the host. Creates an interceptor to apply
* authorization headers to requests. Initializes the service stub with the
* channel and interceptor.
*/
public MyEntityManagerClient() {
String bearerToken = System.getenv("BEARER_TOKEN");
String latticeUrl = System.getenv("LATTICE_URL");

if (bearerToken == null || latticeUrl == null) {
System.err.println("Make sure your Lattice URL and bearer token have been set as system environment variables.");
System.exit(1);
}

// Create a channel object to connect to Lattice using your environment's URL
ManagedChannel channel = ManagedChannelBuilder.forAddress(latticeUrl, 443).useTransportSecurity()
.build();

// Create a metadata object to apply an authorization header to requests.
Metadata header = new Metadata();
Metadata.Key<String> key = Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
header.put(key, "Bearer " + bearerToken);

// Create an interceptor to apply authorization headers to requests.
ClientInterceptor interceptor = MetadataUtils.newAttachHeadersInterceptor(header);

// Initialize the service stub with the channel and interceptor.
this.serviceStub = EntityManagerAPIGrpc.newStub(channel).withInterceptors(interceptor);
}

/**
* Streams all entities that are being tracked by a specified entity.
*
* @param entityId The ID of the specified entity that is tracking others.
* @throws InterruptedException If the thread is interrupted while waiting
* for the response.
*/
public void streamFilteredEntities(String entityId) throws InterruptedException {
// Creates a StreamObserver to handle the response from the server.
StreamObserver<StreamEntityComponentsResponse> responseObserver = new StreamObserver<StreamEntityComponentsResponse>() {
// Invoked when a new message is received from Lattice.
@Override
public void onNext(StreamEntityComponentsResponse value) {
// Print the received value.
System.out.println(value);
}

// Invoked when an error occurs during the stream operation.
@Override
public void onError(Throwable t) {
// Print the stack trace for the error.
t.printStackTrace();
}

// Invoked when the stream operation is completed.
@Override
public void onCompleted() {
// Print a message indicating the stream is finished.
System.out.println("Finished receiving data from server");
}
};

// Define the comparator statement for your filter. Set the field path
// to entity_id in order to match the entity that is tracking others
// in your environment.
Statement entityIdStatement = Statement.newBuilder()
.setPredicate(Predicate.newBuilder()
.setFieldPath("entity_id")
.setComparator(Comparator.COMPARATOR_CASE_INSENSITIVE_EQUALITY)
.setValue(Value.newBuilder()
.setStringType(StringType.newBuilder()
.setValue(entityId)
.build()))
.build())
.build();

// Define the list comparator statement for your filter. Set the list path
// to relationship.relationship in order to match any entity whose
// related_entity_id property equals the specified entity in the first statement.
Statement trackingEntityStatement = Statement.newBuilder()
.setList(ListOperation.newBuilder()
.setListPath("relationships.relationships")
.setListComparator(ListComparator.LIST_COMPARATOR_ANY_OF)
.setStatement(Statement.newBuilder()
.setPredicate(Predicate.newBuilder()
.setFieldPath("related_entity_id")
.setComparator(Comparator.COMPARATOR_CASE_INSENSITIVE_EQUALITY)
.setValue(Value.newBuilder()
.setStringType(StringType.newBuilder()
.setValue(entityId)
.build()))))
.build())
.build();

// Create a statement set using the two previously defined statements.
StatementSet statements = StatementSet.newBuilder()
.addStatements(entityIdStatement)
.addStatements(trackingEntityStatement)
.build();

// Create a OrOperation filter type.
OrOperation filterOperation = OrOperation.newBuilder()
.setStatementSet(statements)
.build();
Statement filter = Statement.newBuilder()
.setOr(filterOperation)
.build();

// Stream and filter the entities from Lattice.
StreamEntityComponentsRequest request = StreamEntityComponentsRequest.newBuilder()
.setFilter(filter)
.setIncludeAllComponents(true)
.build();

// Sends the request to the server and passes in the response observer to handle
// the response.
this.serviceStub.streamEntityComponents(request, responseObserver);
}
}