Watch entities
This page explains getting and streaming information about entities in your environment. You can get real-time data about a single entity, or create a stream continuous data from entities in Lattice
For more information about other API actions that let you receive entity data, see Get Entity (gRPC | HTTP) and Stream Entity (gRPC | HTTP).
Before you begin
To configure your application to watch entities, set up your Lattice environment.
Get an entity
Get details of a specific entity using an entity_ID
and the GetEntity request in gRPC or HTTP.
Find the entity ID
Find an entity's entity_id
in the Lattice UI:
- Open the Lattice UI using your environment URL:
$YOUR_LATTICE_URL
. - In Lattice, locate the entity you want to watch, and select it.
- From the entity toolbar, click ... to open more options, then select Copy Content from the drop down list.
- Select Copy URL and copy the URL to your clipboard.
- Note the entity at the end of the URL and save it:
https://$YOUR_LATTICE_URL.com/c2/entities/$ENTITY_ID
.
You'll replace the $ENTITY_ID
placeholder in the following example with this entity ID.
Run the application
- Go
- Rust
- C++
- Java
- JavaScript
- Python
- curl
-
Copy and paste the following code into two files:
main.go
andlattice_client.go
:main.go
main.gopackage main
import (
"context"
)
// BearerTokenAuth supplies PerRPCCredentials from a given token.
type BearerTokenAuth struct {
Token string
}
// GetRequestMetadata gets the current request metadata, adding the bearer token.
func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + b.Token,
}, nil
}
// RequireTransportSecurity indicates whether the credentials requires transport security.
func (b *BearerTokenAuth) RequireTransportSecurity() bool {
return true // or false if you are developing/testing without TLS
}
func main() {
App()
}lattice_client.go
lattice_client.gopackage main
import (
"context"
"log"
"os"
entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
func App() {
// Retrieve the bearer token and lattice URL from environment variables.
bearerToken, ok := os.LookupEnv("BEARER_TOKEN")
if !ok || bearerToken == "" {
log.Fatalf("BEARER_TOKEN environment variable is not set or is empty.")
}
latticeURL, ok := os.LookupEnv("LATTICE_URL")
if !ok || latticeURL == "" {
log.Fatalf("LATTICE_URL environment variable is not set or is empty.")
}
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient(latticeURL, opts...)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
em := entitymanagerv1.NewEntityManagerAPIClient(conn)
ctx := context.Background()
request := &entitymanagerv1.GetEntityRequest{
EntityId: "$ENTITY_ID", // Replace $ENTITY_ID with any active entity ID.
}
response, err := em.GetEntity(ctx, request)
if err != nil {
log.Fatalf("Error fetching entity: %v", err)
}
log.Printf("Response: %v", response)
}Your project structure should now look similar to the following:
├── go.mod
├── go.sum
├── lattice_client.go
└── main.go -
Clean up your
go.mod
file and construct avendor
directory to store all your dependencies:go mod tidy
go mod vendor -
Build and run the application:
go build
go run .
-
Copy and paste the following example code into your Rust project folder in a new file named
main.rs
. You'll use this file to define your application and connect to Lattice:main.rs
main.rsuse tonic::metadata::MetadataValue;
use tonic::transport::ClientTlsConfig;
use tonic::transport::Channel;
use tonic::Status;
use tonic::Request;
use anduril_lattice_sdk::anduril::entitymanager::v1::entity_manager_api_client::EntityManagerApiClient;
use anduril_lattice_sdk::anduril::entitymanager::v1::GetEntityRequest;
use std::env;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Retrieve your token and Lattice URL from system environment variables.
let token = env::var("BEARER_TOKEN").expect("BEARER_TOKEN is not set in environment variables");
let url = env::var("LATTICE_URL").expect("LATTICE_URL is not set in environment variables");
let bearer_token = format!("Bearer {}", token);
let lattice_url = format!("{}", url);
let header_value: MetadataValue<_> = bearer_token.parse().map_err(|_| Status::internal("Invalid Bearer Token"))?;
let tls_config = ClientTlsConfig::new().with_native_roots();
let registration_channel = Channel::from_shared(lattice_url)
.map_err(|e| Status::invalid_argument(format!("Invalid HTTP endpoint: {}", e)))?
.tls_config(tls_config)
.map_err(|_| Status::internal("TLS configuration failed"))?
.connect()
.await
.map_err(|e| Status::unavailable(format!("Failed to connect: {}", e)))?;
let mut em_client = EntityManagerApiClient::with_interceptor(registration_channel, |mut req: Request<()>| {
req.metadata_mut().insert("authorization", header_value.clone());
Ok(req)
});
let response = em_client.get_entity(GetEntityRequest {
// Replace $ENTITY_ID with the ID of the
// entity you want to retrieve.
entity_id: String::from("$ENTITY_ID")
}).await?.into_inner();
println!("Response: {:?}", response);
Ok(())
}Your project structure should now look similar to the following:
├── src
│ └── main.rs
├── .gitignore
└── Cargo.toml -
Run your project and connect to Lattice:
cargo run
-
Copy and paste the following example code into your C++ project folder in a new file named
main.cpp
. You'll use this file to define your application and connect to Lattice:main.cpp
main.cpp#include <cstdlib>
#include <string>
#include <grpc/grpc.h>
#include <grpcpp/channel.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/security/tls_credentials_options.h>
#include <anduril/entitymanager/v1/entity_manager_grpcapi.pub.grpc.pb.h>
int main(int argc, char *argv[])
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
// Get your Lattice environment URL and bearer token.
const char *lattice_url = std::getenv("LATTICE_URL");
const char *bearer_token = std::getenv("BEARER_TOKEN");
if (!bearer_token || !lattice_url)
{
std::cerr << "Make sure your Lattice URL and bearer token have been set as system environment variables." << std::endl;
return 1;
}
// Authorize using your bearer token.
grpc::ClientContext ctx;
ctx.AddMetadata("authorization", "Bearer " + std::string(bearer_token));
// Open a secure channel using your Lattice URL
auto creds = grpc::SslCredentials(grpc::SslCredentialsOptions());
std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(lattice_url, creds);
// Create a new stub for the service instance.
std::shared_ptr<anduril::entitymanager::v1::EntityManagerAPI::Stub> mStub =
anduril::entitymanager::v1::EntityManagerAPI::NewStub(channel);
// Define the request and response objects.
anduril::entitymanager::v1::GetEntityRequest req;
// Replace ENTITY_ID with any active entity ID in your environment
req.set_entity_id("$ENTITY_ID");
anduril::entitymanager::v1::GetEntityResponse res;
// Get the entity.
grpc::Status status = mStub->GetEntity(&ctx, req, &res);
if (!status.ok())
{
std::cerr << "Error code: " << status.error_code() << std::endl;
std::cerr << "Error message: " << status.error_message() << std::endl;
}
else
{
std::cout << res.entity().DebugString() << std::endl;
}
// Automatically close the channel when out of scope
return 0;
} -
Navigate to
build/
, then build the project.cd build && make
If successful, the
make
command compiles the project and generates an executable calledcli_client
. -
Run the executable to connect to your Lattice environment.
./cli_client
-
Copy and paste the following examples into your Java project folder into two files:
App.java
, andEntityManagerClient.java
. You'll use these files to define your application and connect to Lattice:App.java
App.javapackage org.example;
import org.example.MyEntityManagerClient;
public class App {
/**
* The main method of the application. Creates an instance of
* MyEntityManagerClient, and calls its getEntity method to retrieve an
* entity. It then prints the entity ID to the console.
*/
public static void main(String[] args) {
MyEntityManagerClient entityManager = new MyEntityManagerClient();
try {
// Replace $ENTITY_ID with the ID of
// the entity you want to retrieve.
entityManager.getEntity("$ENTITY_ID");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}EntityManagerClient.java
EntityManagerClient.java// You might need to modify this statement depending on
// the location of your file relative to the project.
package org.example;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.anduril.entitymanager.v1.EntityManagerAPIGrpc;
import com.anduril.entitymanager.v1.EntityManagerAPIGrpc.EntityManagerAPIStub;
import com.anduril.entitymanager.v1.GetEntityRequest;
import com.anduril.entitymanager.v1.GetEntityResponse;
import io.grpc.ChannelCredentials;
import io.grpc.ClientInterceptor;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.TlsChannelCredentials;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.util.AdvancedTlsX509TrustManager;
import io.grpc.util.AdvancedTlsX509TrustManager.Verification;
/**
* This class represents a client for interacting with the EntityManager API. It
* uses gRPC to communicate with the server and provides methods for retrieving
* entities.
*/
public class MyEntityManagerClient {
/**
* The stub for the EntityManagerAPI. This is the client-side implementation
* of the API interface.
*/
private EntityManagerAPIStub serviceStub;
/**
* Constructs a new instance of the client. Creates a channel to connect to
* the server using the URL of the host. Creates an interceptor to apply
* authorization headers to requests. Initializes the service stub with the
* channel and interceptor.
*/
public MyEntityManagerClient() {
// Retrieve your bearer token and Lattice URL from system environment variables.
String bearerToken = System.getenv("BEARER_TOKEN");
String latticeUrl = System.getenv("LATTICE_URL");
if (bearerToken == null || latticeUrl == null) {
System.err.println("Make sure your Lattice URL and bearer token have been set as system environment variables.");
System.exit(1);
}
// Creates a channel object to connect to Lattice using your environment's URL.
ManagedChannel channel = ManagedChannelBuilder.forAddress(latticeUrl, 443).useTransportSecurity().build();
// Creates a metadata object to apply an authorization header to requests.
Metadata header = new Metadata();
Metadata.Key<String> key = Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
header.put(key, "Bearer " + bearerToken);
// Attach the header to the stub.
ClientInterceptor interceptor = MetadataUtils.newAttachHeadersInterceptor(header);
this.serviceStub = EntityManagerAPIGrpc.newStub(channel).withInterceptors(interceptor);
}
/**
* Retrieves an entity by its ID.
*
* @param entityId the ID of the entity to retrieve.
* @throws InterruptedException if the operation is interrupted.
*/
public void getEntity(String entityId) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
this.serviceStub.getEntity(GetEntityRequest.newBuilder().setEntityId(entityId).build(), new StreamObserver<GetEntityResponse>() {
@Override
public void onNext(GetEntityResponse value) {
System.out.println(value);
}
@Override
public void onError(Throwable t) {
Logger.getLogger(MyEntityManagerClient.class.getName()).log(Level.SEVERE, null, t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
latch.await();
}
}Your project structure should now look similar to the following:
├── gradle
│ ├── libs.versions.toml
│ └── wrapper
│ ├── gradle-wrapper.jar
│ └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle
└── app
├── build.gradle
└── src
├── main
│ └── java
│ └── org
│ └── example
│ ├──App.Java
│ └──MyEntityManagerClient.java
└── test
└── java
└── org
└── example
└──AppTest.Java -
Use the
run
command to build the project and connect to your Lattice environment:./gradlew run
-
Copy and paste the following example code into your JavaScript project folder in a new file named
App.ts
. You'll use this file to define your application and connect to Lattice:App.ts
index.tsimport { EntityManagerAPI } from "@anduril-industries/lattice-sdk/src/anduril/entitymanager/v1/entity_manager_grpcapi.pub_pb.js";
import { createGrpcTransport } from "@connectrpc/connect-node";
import { createClient } from "@connectrpc/connect";
// Retrieve the Lattice URL environment variable
const lattice_url = process.env.LATTICE_URL;
// Retrieve the bearer token environment variable.
const bearer_token = process.env.BEARER_TOKEN;
if (!lattice_url || !bearer_token) {
console.error("Make sure your Lattice URL and bearer token have been set as system environment variables.");
process.exit(1);
}
// Create the gRPC transport using your environment URL.
const transport = createGrpcTransport({
baseUrl: "https://" + lattice_url
});
async function App() {
// Create the EntityManagerAPI client using the grpc transport.
const client = createClient(EntityManagerAPI, transport);
// Set the bearer token.
const headers = new Headers();
headers.set("Authorization", "Bearer " + bearer_token);
const response = await client.getEntity(
{ entityId: "$ENTITY_ID" },
{ headers: headers }
)
console.log(response)
}
void App(); -
Run the following to complile your Typescript code to JavaScript:
npx tsc
You should see your compiled code in
App.js
. Your project structure should now look similar to the following:├── node_modules
├── App.js
├── App.ts
├── package-lock.json
├── package.json
└── tsconfig.jsonAlternatively, you can use the following command to run
tsc
in the background:npx tsc --w
The compiler will watch for file changes and automatically compile your code. Next, add this file to your
package.json
file. -
Ensure that the following command exists in your
package.json
file:package.json"scripts": {
"test": "mocha App.js",
"start": "node App.js"
} -
Run the following command to start the application:
npm run start
-
Copy and paste the following example code into your Python project folder in a new file named
app.py
. You'll use this file to define your application and connect to Lattice:app.py
app.pyfrom anduril.entitymanager.v1 import EntityManagerApiStub, GetEntityRequest
from anduril.ontology.v1 import Disposition
from grpclib.client import Channel
import asyncio
import os
import sys
# Retrieve the bearer token and lattice URL from environment variables.
bearer_token = os.getenv('BEARER_TOKEN')
lattice_url = os.getenv('LATTICE_URL')
if not bearer_token or not lattice_url:
print("Make sure your Lattice URL and bearer token have been set as system environment variables.")
sys.exit(1)
# Set the authentication header as a dictionary.
metadata = {
'authorization': f"Bearer {bearer_token}"
}
# Get an entity information.
async def get_entity(entity_id):
# Open a secure channel, using your environment URL.
channel = Channel(host=lattice_url), port=443, ssl=True)
# Create a service instance.
entity_manager_stub = EntityManagerApiStub(channel)
# Get the entity.
response = await entity_manager_stub.get_entity(GetEntityRequest(entity_id=entity_id), metadata=metadata)
# Close the channel.
channel.close()
return response
if __name__ == "__main__":
# Replace $ENTITY_ID with the ID
# of the entity you want to retrieve.
print(asyncio.run(get_entity("$ENTITY_ID"))) -
Use the
python
conmand in the same folder where you savedget_entity.py
to run the module:python get_entity.py
Use curl
in your command line tool as shown in the following:
curl -L -X GET 'https://$YOUR_LATTICE_URL/api/v1/entities/$ENTITY_ID' \\
-H 'Authorization: Bearer $YOUR_BEARER_TOKEN' \\
-H 'Accept: application/json'
Replace $ENTITY_ID with your entity's unique ID.
Verify the response
If successful, you'll see the entity represented as a Protobuf message in the response, similar to the following:
entity {
entity_id: "AAAACAAAA1111AAAAAA11A1"
description: "Asset 62b0 from SOURCE_ANDURIL"
is_live: true
created_time {
seconds: 1738266966
nanos: 369166124
}
expiry_time {
seconds: 1738446773
nanos: 690232366
}
status {
platform_activity: "Auto"
}
.
.
.
}
If you see an error message similar to the following, entity not found AAAACAAAA1111AAAAAA11A1
,
This means you successfully authenticated and connected to Lattice, but an entity with
the provided ID does not exist. This is an expected error. If you get any other error, check your code.
Make sure that your Lattice environment URL and bearer token are correct.
Allow self-signed certificates
Make sure you understand the implications of using self-signed certificates. This should only be used for testing.
In order to connect to a server that is using a self-signed certificate, replace your gRPC channel declaration as shown in the following:
- Go
- C++
- Java
- Python
// Remove the following statements, and replace with a new transport credential declaration.
.
.
.
- grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
+ grpc.WithTransportCredentials(credentials.NewTLS(
&tls.Config{
InsecureSkipVerify: true,
})),
// Add the following include statement to the top of the file.
+ #include <grpcpp/security/tls_credentials_options.h>
.
.
.
// Remove the following statements.
- auto creds = grpc::SslCredentials(grpc::SslCredentialsOptions());
- std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(url, creds);
+
// Replace with the following
+ grpc::experimental::TlsChannelCredentialsOptions authOptions;
+ authOptions.set_verify_server_certs(false);
+
+ auto verifier = std::make_shared<grpc::experimental::NoOpCertificateVerifier>();
+ authOptions.set_certificate_verifier(verifier);
+ authOptions.set_check_call_host(false);
+
+ std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(url, grpc::experimental::TlsCredentials(authOptions));
// Add the following import statement to the top of the file.
+ import io.grpc.Channel;
.
.
.
- ManagedChannel channel = ManagedChannelBuilder.forAddress(latticeUrl, 443).useTransportSecurity().build();
+
+ Channel channel;
+ try {
+ // Create a trust manager that skips all verification checks.
+ AdvancedTlsX509TrustManager clientTrustManager = AdvancedTlsX509TrustManager.newBuilder().setVerification(Verification.INSECURELY_SKIP_ALL_VERIFICATION).build();
+
+ // Create channel credentials using the trust manager.
+ ChannelCredentials channelCredentials = TlsChannelCredentials.newBuilder().trustManager(clientTrustManager).build();
+
+ // Establish a connection to Lattice using your Lattice environment URL and credentials.
+ channel = Grpc.newChannelBuilderForAddress(latticeUrl, 443, channelCredentials).build();
+ } catch (Exception e) {
+
+ // Log a helpful error message if there is an exception while generating the certificate.
+ Logger.getGlobal().log(Level.INFO, "Failed to generate Certificate- %s", e);
+ return;
+ }
# Remove the following statement, and replace it with a new channel declaration.
- channel = Channel(os.getenv('LATTICE_URL'), port=443, ssl=True)
+
+ ctx = ssl.create_default_context()
+ ctx.check_hostname = False
+ ctx.verify_mode = ssl.CERT_NONE
+
+ channel = Channel(os.getenv('LATTICE_URL'), port=443, ssl=ctx)
Stream entities
To get a real-time stream of all entities, use the StreamEntityComponents
API:
- Go
- Rust
- C++
- Java
- JavaScript
- Python
- curl
Use the StreamEntityComponents
API action to instantiate the stream. You can create a new file, or replace the existing
code in lattice_client.go
as shown:
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
IncludeAllComponents: true,
})
lattice_client.go
package main
import (
"context"
"log"
"os"
entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
func App() {
// Retrieve the bearer token and lattice URL from environment variables.
bearerToken, ok := os.LookupEnv("BEARER_TOKEN")
if !ok || bearerToken == "" {
log.Fatalf("BEARER_TOKEN environment variable is not set or is empty.")
}
latticeURL, ok := os.LookupEnv("LATTICE_URL")
if !ok || latticeURL == "" {
log.Fatalf("LATTICE_URL environment variable is not set or is empty.")
}
ctx := context.Background()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient(latticeURL, opts...)
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()
em := entitymanagerv1.NewEntityManagerAPIClient(conn)
// Start stream for all Entity Events including all components in the entity.
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
IncludeAllComponents: true,
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}
for {
// Receive single entity event response.
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received entity event: %v", resp.GetEntityEvent())
}
}
Use the stream_entity_components
API action to instantiate the stream. You can create a new file, or replace the existing
code in main.rs
as shown:
let mut stream = em_client.stream_entity_components(StreamEntityComponentsRequest{
components_to_include: vec![],
heartbeat_period_millis: 250,
include_all_components: true,
filter: None,
rate_limit: None,
preexisting_only: false,
}).await?.into_inner();
main.rs
use anduril_lattice_sdk::anduril::entitymanager::v1::entity_manager_api_client::EntityManagerApiClient;
use anduril_lattice_sdk::anduril::entitymanager::v1::{GetEntityRequest, GetEntityResponse};
use tonic::metadata::MetadataValue;
use tonic::transport::{ClientTlsConfig, Channel};
use tonic::{Request, Status};
async fn stream_entities() -> Result<(), Box<dyn std::error::Error>>{
// Retrieve your token and Lattice URL from system environment variables.
let token = env::var("BEARER_TOKEN").expect("BEARER_TOKEN is not set in environment variables");
let url = env::var("LATTICE_URL").expect("LATTICE_URL is not set in environment variables");
let bearer_token = format!("Bearer {}", token);
let lattice_url = format!("{}", url);
let header_value: MetadataValue<_> = bearer_token.parse().map_err(|_| Status::internal("Invalid Bearer Token"))?;
let tls_config = ClientTlsConfig::new().with_native_roots();
let registration_channel = Channel::from_shared(lattice_url)
.map_err(|e| Status::invalid_argument(format!("Invalid HTTP endpoint: {}", e)))?
.tls_config(ClientTlsConfig::new().with_native_roots())
.map_err(|_| Status::internal("TLS configuration failed"))?
.connect()
.await
.map_err(|e| Status::unavailable(format!("Failed to connect: {}", e)))?;
let mut em_client = EntityManagerApiClient::with_interceptor(registration_channel, |mut req: Request<()>| {
req.metadata_mut().insert("authorization", header_value.clone());
Ok(req)
});
let mut stream = em_client.stream_entity_components(StreamEntityComponentsRequest{
components_to_include: vec![],
heartbeat_period_millis: 250,
include_all_components: true,
filter: None,
rate_limit: None,
preexisting_only: false,
}).await?.into_inner();
while let Some(resp) = stream.message().await? {
let resp: StreamEntityComponentsResponse = resp;
println!("Received event: {:?}", resp);
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream = stream_entities().await;
Ok(stream?)
}
Use the stream_entity_components
API and set include_all_components
to true
to stream all components.
You can create a new file, or replace the existing code in main.cpp
as shown:
anduril::entitymanager::v1::StreamEntityComponentsRequest req;
req.set_include_all_components(true);
main.cpp
#include <cstdlib>
#include <string>
#include <grpc/grpc.h>
#include <grpcpp/channel.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/security/tls_credentials_options.h>
#include <anduril/entitymanager/v1/entity_manager_grpcapi.pub.grpc.pb.h>
using anduril::entitymanager::v1::EntityManagerApi;
using anduril::entitymanager::v1::StreamEntityComponentsRequest;
using anduril::entitymanager::v1::StreamEntityComponentsResponse;
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
int main()
{
std::string bearer_token = std::getenv("BEARER_TOKEN");
std::string lattice_url = std::getenv("LATTICE_URL");
if (bearer_token.empty() || lattice_url.empty())
{
std::cerr << "Make sure your Lattice URL and bearer token have been set as system environment variables." << std::endl;
return 1;
}
// Authorize using your bearer token.
grpc::ClientContext ctx;
ctx.AddMetadata("authorization", "Bearer " + std::string(bearer_token));
// Open a secure channel using your Lattice URL
auto creds = grpc::SslCredentials(grpc::SslCredentialsOptions());
std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(lattice_url, creds);
// Create a new stub for the service instance.
std::shared_ptr<anduril::entitymanager::v1::EntityManagerAPI::Stub> mStub =
anduril::entitymanager::v1::EntityManagerAPI::NewStub(channel);
// Define the request and response objects.
anduril::entitymanager::v1::StreamEntityComponentsRequest req;
req.set_include_all_components(true);
anduril::entitymanager::v1::StreamEntityComponentsResponse res;
grpc::Status status = mStub->StreamEntityComponents(&ctx, req, &res);
while (reader->Read(&res))
{
std::cout << res.entity_event().entity().DebugString() << std::endl;
}
Status status = reader->Finish();
if (!status.ok())
{
std::cerr << "StreamEntityComponents failed: " << status.error_message() << std::endl;
}
return 0;
}
Use streamEntityComponents
and pass setIncludeAllComponents(true)
to your function call to stream all components.
You can create a new file, or replace the existing code in EntityManagerClient.java
as shown:
this.serviceStub.streamEntityComponents(StreamEntityComponentsRequest.newBuilder().setIncludeAllComponents(true).build(), new StreamObserver<StreamEntityComponentsResponse>() {
@Override
public void onNext(StreamEntityComponentsResponse value) {
System.out.println(value);
}
@Override
public void onError(Throwable t) {
Logger.getLogger(MyEntityManagerClient.class.getName()).log(Level.SEVERE, null, t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
App.java
package org.example;
public class App {
/**
* The main method of the application. Creates an instance of
* MyEntityManagerClient, and calls its getEntity method to retrieve an
* entity. It then prints the entity ID to the console.
*/
public static void main(String[] args) {
MyEntityManagerClient entityManager = new MyEntityManagerClient();
try {
// Stream all entities with all components whenever there
// is a change in any component of an entity.
entityManager.streamAllEntityComponents();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
EntityManagerClient.java
// You might need to modify this statement depending on
// ßthe location of your file relative to the project.
package org.example;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
import com.anduril.entitymanager.v1.EntityManagerAPIGrpc;
import com.anduril.entitymanager.v1.EntityManagerAPIGrpc.EntityManagerAPIStub;
import com.anduril.entitymanager.v1.StreamEntityComponentsRequest;
import com.anduril.entitymanager.v1.StreamEntityComponentsResponse;
import io.grpc.ChannelCredentials;
import io.grpc.ClientInterceptor;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.TlsChannelCredentials;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.util.AdvancedTlsX509TrustManager;
import io.grpc.util.AdvancedTlsX509TrustManager.Verification;
/**
* This class represents a client for interacting with the EntityManager API. It
* uses gRPC to communicate with the server and provides methods for retrieving
* entities.
*/
public class MyEntityManagerClient {
/**
* The stub for the EntityManagerAPI. This is the client-side implementation
* of the API interface.
*/
private EntityManagerAPIStub serviceStub;
/**
* Constructs a new instance of the client. Creates a channel to connect to
* the server using the URL of the host. Creates an interceptor to apply
* authorization headers to requests. Initializes the service stub with the
* channel and interceptor.
*/
public MyEntityManagerClient() {
// Retrieve your bearer token and Lattice URL from system environment variables.
String bearerToken = System.getenv("BEARER_TOKEN");
String latticeUrl = System.getenv("LATTICE_URL");
if (bearerToken == null || latticeUrl == null) {
System.err.println("Make sure your Lattice URL and bearer token have been set as system environment variables.");
System.exit(1);
}
// Creates a channel object to connect to Lattice using your environment's URL.
ManagedChannel channel = ManagedChannelBuilder.forAddress(latticeUrl, 443).useTransportSecurity().build();
// Creates a metadata object to apply an authorization header to requests.
Metadata header = new Metadata();
Metadata.Key<String> key = Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
header.put(key, "Bearer " + bearerToken);
// Attach the header to the stub.
ClientInterceptor interceptor = MetadataUtils.newAttachHeadersInterceptor(header);
this.serviceStub = EntityManagerAPIGrpc.newStub(channel).withInterceptors(interceptor);
}
/**
* Streams all entities and all components whenever there is a change in any
* component of an entity.
*
* @param entityId the ID of the entity to retrieve.
* @throws InterruptedException if the operation is interrupted.
*/
public void streamAllEntityComponents() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
this.serviceStub.streamEntityComponents(StreamEntityComponentsRequest.newBuilder().setIncludeAllComponents(true).build(), new StreamObserver<StreamEntityComponentsResponse>() {
@Override
public void onNext(StreamEntityComponentsResponse value) {
System.out.println(value);
}
@Override
public void onError(Throwable t) {
Logger.getLogger(MyEntityManagerClient.class.getName()).log(Level.SEVERE, null, t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
latch.await();
}
}
Use streamEntityComponents
to stream components. Set the includeAllComponents
attribute to true
to stream all components.
Replace the existing code in App.ts
as shown:
const response = client.streamEntityComponents(
{ includeAllComponents: true },
{ headers: headers }
)
App.ts
import { EntityManagerAPI } from "@anduril-industries/lattice-sdk/src/anduril/entitymanager/v1/entity_manager_grpcapi.pub_pb.js";
import { createGrpcTransport } from "@connectrpc/connect-node";
import { createClient } from "@connectrpc/connect";
// Retrieve the Lattice URL environment variable
const lattice_url = process.env.LATTICE_URL;
// Retrieve the bearer token environment variable.
const bearer_token = process.env.BEARER_TOKEN;
if (!lattice_url || !bearer_token) {
console.error("Make sure your Lattice URL and bearer token have been set as system environment variables.");
process.exit(1);
}
// Create the gRPC transport using your environment URL.
const transport = createGrpcTransport({
baseUrl: baseUrl: "https://" + lattice_url
});
async function streamEntities(iterator: AsyncIterable<any>) {
for await (const entity of iterator) {
// Process each item from the iterator.
console.log(entity);
}
}
async function App() {
// Create the EntityManagerAPI client using the grpc transport.
const client = createClient(EntityManagerAPI, transport);
// Set the bearer token.
const headers = new Headers();
headers.set("Authorization", "Bearer " + bearer_token);
streamEntities(client.streamEntityComponents(
{ includeAllComponents: true },
{ headers: headers }
));
}
void App();
Use the stream_entity_components
API action to instantiate the stream. You can create a new file, or replace the existing
code in app.py
as shown:
stream = entity_manager_stub.stream_entity_components(
StreamEntityComponentsRequest(include_all_components=True),
metadata=metadata
)
app.py
from anduril.entitymanager.v1 import EntityManagerApiStub, StreamEntityComponentsRequest
from grpclib.client import Channel
import asyncio
import os
import sys
# Retrieve the bearer token and your Lattice URL from system environment variables.
bearer_token = os.getenv('BEARER_TOKEN')
lattice_url = os.getenv('LATTICE_URL')
if not bearer_token or not lattice_url:
print("Make sure your Lattice URL and bearer token have been set as system environment variables.")
sys.exit(1)
metadata = {
'authorization': f"Bearer {bearer_token}"
}
async def stream_entities():
# open secure channel
channel = Channel(host=lattice_url, port=443, ssl=True)
# create service instance
entity_manager_stub = EntityManagerApiStub(channel)
# Stream all entities with all components.
stream = entity_manager_stub.stream_entity_components(
StreamEntityComponentsRequest(include_all_components=True),
metadata=metadata
)
async for event in stream:
print(event.entity_event.entity)
if __name__ == "__main__":
raise SystemExit(asyncio.run(stream_entities()))
Use the /entities/events
endpoint to stream all entities. You must set "includeAllComponents"
to true
in the request body to stream all components.
curl -L -X POST 'https://$YOUR_LATTICE_URL/api/v1/entities/events' \\
-H 'Authorization: Bearer $YOUR_BEARER_TOKEN' \\
-H 'Content-Type: application/json' \\
-H 'Accept: application/json' \\
--data-raw '{ "includeAllComponents": true }'
Stream specific components
You can specify a subset of components to stream from the entity
object by
providing the component in snake_case
. The StreamEntityComponents
API
only streams changes to the specified components:
- Go
- Rust
- JavaScript
- Python
Use Lattice SDK for Go to stream entities containing only the location_uncertainty
& power_state
components.
The resulting stream will produce events when any components change, but only include the specified components:
// Start stream for Entity Events containing a change in the following components: location_uncertainty, power_state
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
ComponentsToInclude: []string{"location_uncertainty", "power_state"},
})
lattice_client.go
package main
import (
"context"
"log"
"os"
entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// BearerTokenAuth supplies PerRPCCredentials from a given token.
type BearerTokenAuth struct {
Token string
}
// GetRequestMetadata gets the current request metadata, adding the bearer token.
func (b *BearerTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + b.Token,
}, nil
}
// RequireTransportSecurity indicates whether the credentials requires transport security.
func (b *BearerTokenAuth) RequireTransportSecurity() bool {
return true // or false if you are developing/testing without TLS.
}
func main() {
// Retrieve the bearer token and lattice URL from environment variables.
bearerToken, ok := os.LookupEnv("BEARER_TOKEN")
if !ok || bearerToken == "" {
log.Fatalf("BEARER_TOKEN environment variable is not set or is empty.")
}
latticeURL, ok := os.LookupEnv("LATTICE_URL")
if !ok || latticeURL == "" {
log.Fatalf("LATTICE_URL environment variable is not set or is empty.")
}
ctx := context.Background()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient(latticeURL, opts...)
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()
em := entitymanagerv1.NewEntityManagerAPIClient(conn)
// Start stream for events whenever any components change, but include only the following
// in reponse: location_uncertainty, power_state
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
ComponentsToInclude: []string{"location_uncertainty", "power_state"},
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}
for {
// Receive single entity event response
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received entity event: %v", resp.GetEntityEvent())
}
}
Use Lattice SDK for Rust to stream entities containing only the location_uncertainty
& aliases
components.
The resulting stream will only produce events when any components change, but only include the specified components:
// Start stream for Entity Events containing a change in the following components: location_uncertainty, aliases
let mut stream = em_client.stream_entity_components(StreamEntityComponentsRequest{
components_to_include: vec!["location_uncertainty".to_string(), "aliases".to_string()],
heartbeat_period_millis: 250,
include_all_components: false,
filter: None,
rate_limit: None,
preexisting_only: false,
}).await?.into_inner();
main.rs
use anduril_lattice_sdk::anduril::entitymanager::v1::entity_manager_api_client::EntityManagerApiClient;
use anduril_lattice_sdk::anduril::entitymanager::v1::{GetEntityRequest, GetEntityResponse};
use tonic::metadata::MetadataValue;
use tonic::transport::{ClientTlsConfig, Channel};
use tonic::{Request, Status};
async fn stream_entities_subcomponents() -> Result<(), Box<dyn std::error::Error>>{
// Retrieve your token and Lattice URL from system environment variables.
let token = env::var("BEARER_TOKEN").expect("BEARER_TOKEN is not set in environment variables");
let url = env::var("LATTICE_URL").expect("LATTICE_URL is not set in environment variables");
let bearer_token = format!("Bearer {}", token);
let lattice_url = format!("{}", url);
let header_value: MetadataValue<_> = bearer_token.parse().map_err(|_| Status::internal("Invalid Bearer Token"))?;
let tls_config = ClientTlsConfig::new().with_native_roots();
let registration_channel = Channel::from_shared(http_endpoint)
.map_err(|e| Status::invalid_argument(format!("Invalid HTTP endpoint: {}", e)))?
.tls_config(tls_config)
.map_err(|_| Status::internal("TLS configuration failed"))?
.connect()
.await
.map_err(|e| Status::unavailable(format!("Failed to connect: {}", e)))?;
let mut em_client = EntityManagerApiClient::with_interceptor(registration_channel, |mut req: Request<()>| {
req.metadata_mut().insert("authorization", header_value.clone());
Ok(req)
});
let mut stream = em_client.stream_entity_components(StreamEntityComponentsRequest{
components_to_include: vec!["location_uncertainty".to_string(), "aliases".to_string()],
heartbeat_period_millis: 250,
include_all_components: false,
filter: None,
rate_limit: None,
preexisting_only: false,
}).await?.into_inner();
while let Some(resp) = stream.message().await? {
let resp: StreamEntityComponentsResponse = resp;
println!("Received event: {:?}", resp);
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream = stream_entities_subcomponents().await;
Ok(stream?)
}
Use the Lattice SDK for JavaScript to stream entities containing only the location_uncertainty
& aliases
components.
The resulting stream will only produce events when any components change, but only include the specified components:
const response = client.streamEntityComponents(
{ componentsToInclude: ["location_uncertainty", "aliases"] },
{ headers: headers }
)
App.ts
import { EntityManagerAPI } from "@anduril-industries/lattice-sdk/src/anduril/entitymanager/v1/entity_manager_grpcapi.pub_pb.js";
import { createGrpcTransport } from "@connectrpc/connect-node";
import { createClient } from "@connectrpc/connect";
// Retrieve the Lattice URL environment variable
const lattice_url = process.env.LATTICE_URL;
// Retrieve the bearer token environment variable.
const bearer_token = process.env.BEARER_TOKEN;
if (!lattice_url || !bearer_token) {
console.error("Make sure your Lattice URL and bearer token have been set as system environment variables.");
process.exit(1);
}
// Create the gRPC transport using your environment URL.
const transport = createGrpcTransport({
baseUrl: baseUrl: "https://" + lattice_url
});
async function streamEntities(iterator: AsyncIterable<any>) {
for await (const entity of iterator) {
// Process each item from the iterator.
console.log(entity);
}
}
async function App() {
// Create the EntityManagerAPI client using the grpc transport.
const client = createClient(EntityManagerAPI, transport);
// Set the bearer token.
const headers = new Headers();
headers.set("Authorization", "Bearer " + bearer_token);
streamEntities(client.streamEntityComponents(
{ componentsToInclude: ["location_uncertainty", "aliases"] },
{ headers: headers }
));
}
void App();
Use the Lattice SDK for Python to stream entities containing only the location_uncertainty
& aliases
components.
The resulting stream will only produce events when any components change, but only include the specified components:
stream = entity_manager_stub.stream_entity_components(
StreamEntityComponentsRequest(components_to_include=["location_uncertainty", "alias"]),
metadata=metadata
)
app.py
from anduril.entitymanager.v1 import EntityManagerApiStub, StreamEntityComponentsRequest
from grpclib.client import Channel
import asyncio
import os
import sys
# Retrieve the bearer token and environment URL from environment variables.
bearer_token = os.getenv('BEARER_TOKEN')
lattice_url = os.getenv('LATTICE_URL')
if not bearer_token or not lattice_url:
print("Make sure your Lattice URL and bearer token have been set as system environment variables.")
sys.exit(1)
metadata = {
'authorization': f"Bearer {bearer_token}"
}
async def stream_entities():
# Open a secure channel using your environment URL.
channel = Channel(host=lattice_url, port=443, ssl=True)
# Create an API stub for the EntityManager.
entity_manager_stub = EntityManagerApiStub(channel)
# Stream all entities with all components.
stream = entity_manager_stub.stream_entity_components(
StreamEntityComponentsRequest(components_to_include=["location_uncertainty", "alias"]),
metadata=metadata
)
async for event in stream:
print(event.entity_event.entity)
if __name__ == "__main__":
raise SystemExit(asyncio.run(stream_entities()))
Apply filters
This feature is only available using Lattice SDKs for gRPC.
The StreamEntityComponents
API allows for any field on the
entity
component to be filtered
against.
OR
operator
The following is an example of an OR
predicate filter. This filter matches an entity if
mil_view.disposition
is SUSPICIOUS
or HOSTILE
. The disposition value is an enum,
and the JSON value corresponds to a number:
- Go
- Javascript
lattice_client.go
package main
import (
"context"
"log"
"os"
entitymanagerv1 "github.com/anduril/lattice-sdk-go/src/anduril/entitymanager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
func App() {
// Retrieve the bearer token and lattice URL from environment variables
bearerToken, ok := os.LookupEnv("BEARER_TOKEN")
if !ok || bearerToken == "" {
log.Fatalf("BEARER_TOKEN environment variable is not set or is empty.")
}
latticeURL, ok := os.LookupEnv("LATTICE_URL")
if !ok || latticeURL == "" {
log.Fatalf("LATTICE_URL environment variable is not set or is empty.")
}
ctx := context.Background()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(&BearerTokenAuth{Token: bearerToken}),
}
conn, err := grpc.NewClient(latticeURL, opts...)
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()
em := entitymanagerv1.NewEntityManagerAPIClient(conn)
// Filter Statement that defines 2 OR predicates that filter suspicious or
// hostile entities.
filter := &entitymanagerv1.Statement{
Operation: &entitymanagerv1.Statement_Or{
Or: &entitymanagerv1.OrOperation{
Children: &entitymanagerv1.OrOperation_PredicateSet{
PredicateSet: &entitymanagerv1.PredicateSet{
Predicates: []*entitymanagerv1.Predicate{
{
FieldPath: "mil_view.disposition",
Value: &entitymanagerv1.Value{
Type: &entitymanagerv1.Value_EnumType{
EnumType: &entitymanagerv1.EnumType{
Value: int32(ontologyv1.Disposition_DISPOSITION_HOSTILE),
},
},
},
Comparator: entitymanagerv1.Comparator_COMPARATOR_EQUALITY,
},
{
FieldPath: "mil_view.disposition",
Value: &entitymanagerv1.Value{
Type: &entitymanagerv1.Value_EnumType{
EnumType: &entitymanagerv1.EnumType{
Value: int32(ontologyv1.Disposition_DISPOSITION_SUSPICIOUS),
},
},
},
Comparator: entitymanagerv1.Comparator_COMPARATOR_EQUALITY,
},
},
},
},
},
},
}
streamClient, err := em.StreamEntityComponents(ctx, &entitymanagerv1.StreamEntityComponentsRequest{
Filter: filter,
IncludeAllComponents: true,
})
if err != nil {
log.Fatalf("Error initiating entity stream: %v", err)
}
for {
// Receive single entity event response.
resp, err := streamClient.Recv()
if err != nil {
log.Fatalf("Error receiving from entity stream: %v", err)
}
log.Printf("Received filtered entity event: %v", resp.GetEntityEvent())
}
}
index.ts
import { EntityManagerAPI } from "@anduril-industries/lattice-sdk/src/anduril/entitymanager/v1/entity_manager_grpcapi.pub_pb.js";
import { Disposition } from "@anduril-industries/lattice-sdk/src/anduril/ontology/v1/type.pub_pb.js";
import { Comparator } from "@anduril-industries/lattice-sdk/src/anduril/entitymanager/v1/filter.pub_pb.js";
import { createGrpcTransport } from "@connectrpc/connect-node";
import { createClient } from "@connectrpc/connect";
// Retrieve the Lattice URL environment variable
const lattice_url = process.env.LATTICE_URL;
// Retrieve the bearer token environment variable.
const bearer_token = process.env.BEARER_TOKEN;
if (!lattice_url || !bearer_token) {
console.error("Make sure your Lattice URL and bearer token \
have been set as system environment variables.");
process.exit(1);
}
// Create the gRPC transport using your environment URL.
const transport = createGrpcTransport({
baseUrl: baseUrl: "https://" + lattice_url
});
const client = createClient(EntityManagerAPI, transport);
const filter = {
operation: {
or: {
predicateSet: {
predicates: [
{
fieldPath: "mil_view.disposition",
value: {
enumType: {
value: Disposition.HOSTILE
}
},
comparator: Comparator.EQUALITY
},
{
fieldPath: "mil_view.disposition",
value: {
enumType: {
value: Disposition.SUSPICIOUS
}
},
comparator: Comparator.EQUALITY
}
]
}
}
}
};
async function streamEntities(iterator: AsyncIterable<any>) {
for await (const entity of iterator) {
// Process each item from the iterator.
console.log(entity);
}
}
async function App() {
try {
const streamClient = client.streamEntityComponents({
filter: filter,
includeAllComponents: true,
});
await streamEntities(streamClient);
} catch (err) {
console.error("Error initiating entity stream:", err);
}
}
App();
List
operator
The following is an example of an List
predicate filter. This filter matches all entities being tracked
by a specific entity:
- Java
EntityManagerClient.java
// You might need to modify this statement depending
// on the location of your file relative to the project.
package org.example;
import com.anduril.entitymanager.v1.Comparator;
import com.anduril.entitymanager.v1.EntityManagerAPIGrpc;
import com.anduril.entitymanager.v1.EntityManagerAPIGrpc.EntityManagerAPIStub;
import com.anduril.entitymanager.v1.ListComparator;
import com.anduril.entitymanager.v1.ListOperation;
import com.anduril.entitymanager.v1.OrOperation;
import com.anduril.entitymanager.v1.Predicate;
import com.anduril.entitymanager.v1.Statement;
import com.anduril.entitymanager.v1.StatementSet;
import com.anduril.entitymanager.v1.StreamEntityComponentsRequest;
import com.anduril.entitymanager.v1.StreamEntityComponentsResponse;
import com.anduril.entitymanager.v1.StringType;
import com.anduril.entitymanager.v1.Value;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
/**
* This class represents a client for interacting with the EntityManager API. It
* uses gRPC to communicate with the server and provides methods for retrieving
* entities.
*/
public class MyEntityManagerClient {
/**
* The stub for the EntityManagerAPI. This is the client-side implementation
* of the API interface.
*/
private EntityManagerAPIStub serviceStub;
/**
* Constructs a new instance of the client. Creates a channel to connect to
* the server using the URL of the host. Creates an interceptor to apply
* authorization headers to requests. Initializes the service stub with the
* channel and interceptor.
*/
public MyEntityManagerClient() {
String bearerToken = System.getenv("BEARER_TOKEN");
String latticeUrl = System.getenv("LATTICE_URL");
if (bearerToken == null || latticeUrl == null) {
System.err.println("Make sure your Lattice URL and bearer token have been set as system environment variables.");
System.exit(1);
}
// Create a channel object to connect to Lattice using your environment's URL
ManagedChannel channel = ManagedChannelBuilder.forAddress(latticeUrl, 443).useTransportSecurity()
.build();
// Create a metadata object to apply an authorization header to requests.
Metadata header = new Metadata();
Metadata.Key<String> key = Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
header.put(key, "Bearer " + bearerToken);
// Create an interceptor to apply authorization headers to requests.
ClientInterceptor interceptor = MetadataUtils.newAttachHeadersInterceptor(header);
// Initialize the service stub with the channel and interceptor.
this.serviceStub = EntityManagerAPIGrpc.newStub(channel).withInterceptors(interceptor);
}
/**
* Streams all entities that are being tracked by a specified entity.
*
* @param entityId The ID of the specified entity that is tracking others.
* @throws InterruptedException If the thread is interrupted while waiting
* for the response.
*/
public void streamFilteredEntities(String entityId) throws InterruptedException {
// Creates a StreamObserver to handle the response from the server.
StreamObserver<StreamEntityComponentsResponse> responseObserver = new StreamObserver<StreamEntityComponentsResponse>() {
// Invoked when a new message is received from Lattice.
@Override
public void onNext(StreamEntityComponentsResponse value) {
// Print the received value.
System.out.println(value);
}
// Invoked when an error occurs during the stream operation.
@Override
public void onError(Throwable t) {
// Print the stack trace for the error.
t.printStackTrace();
}
// Invoked when the stream operation is completed.
@Override
public void onCompleted() {
// Print a message indicating the stream is finished.
System.out.println("Finished receiving data from server");
}
};
// Define the comparator statement for your filter. Set the field path
// to entity_id in order to match the entity that is tracking others
// in your environment.
Statement entityIdStatement = Statement.newBuilder()
.setPredicate(Predicate.newBuilder()
.setFieldPath("entity_id")
.setComparator(Comparator.COMPARATOR_CASE_INSENSITIVE_EQUALITY)
.setValue(Value.newBuilder()
.setStringType(StringType.newBuilder()
.setValue(entityId)
.build()))
.build())
.build();
// Define the list comparator statement for your filter. Set the list path
// to relationship.relationship in order to match any entity whose
// related_entity_id property equals the specified entity in the first statement.
Statement trackingEntityStatement = Statement.newBuilder()
.setList(ListOperation.newBuilder()
.setListPath("relationships.relationships")
.setListComparator(ListComparator.LIST_COMPARATOR_ANY_OF)
.setStatement(Statement.newBuilder()
.setPredicate(Predicate.newBuilder()
.setFieldPath("related_entity_id")
.setComparator(Comparator.COMPARATOR_CASE_INSENSITIVE_EQUALITY)
.setValue(Value.newBuilder()
.setStringType(StringType.newBuilder()
.setValue(entityId)
.build()))))
.build())
.build();
// Create a statement set using the two previously defined statements.
StatementSet statements = StatementSet.newBuilder()
.addStatements(entityIdStatement)
.addStatements(trackingEntityStatement)
.build();
// Create a OrOperation filter type.
OrOperation filterOperation = OrOperation.newBuilder()
.setStatementSet(statements)
.build();
Statement filter = Statement.newBuilder()
.setOr(filterOperation)
.build();
// Stream and filter the entities from Lattice.
StreamEntityComponentsRequest request = StreamEntityComponentsRequest.newBuilder()
.setFilter(filter)
.setIncludeAllComponents(true)
.build();
// Sends the request to the server and passes in the response observer to handle
// the response.
this.serviceStub.streamEntityComponents(request, responseObserver);
}
}