The TRUF.NETWORK SDK provides developers with tools to interact with the TRUF.NETWORK, a decentralized platform for publishing, composing, and consuming economic data streams.
If you need help, don't hesitate to open an issue.
- Go 1.20 or later
- Docker (for local node setup)
- A local TN node (optional, for local testing)
go get github.com/trufnetwork/sdk-go
-
Prerequisites:
- Docker
- Docker Compose
- Git
-
Clone the TN Node Repository:
git clone https://github.com/trufnetwork/node.git cd node
-
Start the Local Node:
# Start the node in development mode task single:start
Note: Setting up a local node as described above will initialize an empty database. This setup is primarily for testing the technology or development purposes. If you are a node operator and wish to sync with the TRUF.NETWORK to access real data, please follow the Node Operator Guide for instructions on connecting to the network and syncing data.
-
Verify Node Synchronization
When running a local node, it's crucial to ensure it's fully synchronized before querying data. If you are running as a node operator or are connected to the network, use the following command to check node status:
kwild admin status
Note: If you are running a setup without operating as a node operator or connecting to the network, this command is not needed.
Here's an example of querying the AI Index stream from a local node:
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/trufnetwork/kwil-db/core/crypto"
"github.com/trufnetwork/kwil-db/core/crypto/auth"
"github.com/trufnetwork/sdk-go/core/tnclient"
"github.com/trufnetwork/sdk-go/core/types"
)
func main() {
ctx := context.Background()
// Set up local node connection
pk, err := crypto.Secp256k1PrivateKeyFromHex("your-private-key")
if err != nil {
log.Fatalf("Failed to parse private key: %v", err)
}
signer := &auth.EthPersonalSigner{Key: *pk}
// Connect to local node
tnClient, err := tnclient.NewClient(
ctx,
"http://localhost:8484", // Local node endpoint
tnclient.WithSigner(signer),
)
if err != nil {
log.Fatalf("Failed to create TN client: %v", err)
}
// AI Index stream details
dataProvider := "0x4710a8d8f0d845da110086812a32de6d90d7ff5c"
streamId := "stai0000000000000000000000000000"
// Retrieve records from the last week
now := time.Now()
weekAgo := now.AddDate(0, 0, -7)
fromTime := int(weekAgo.Unix())
toTime := int(now.Unix())
primitiveActions, err := tnClient.LoadComposedActions()
if err != nil {
log.Fatalf("Failed to load primitive actions: %v", err)
}
result, err := primitiveActions.GetRecord(ctx, types.GetRecordInput{
DataProvider: dataProvider,
StreamId: streamId,
From: &fromTime,
To: &toTime,
})
if err != nil {
log.Fatalf("Failed to retrieve records: %v", err)
}
// Display retrieved records
fmt.Println("AI Index Records from Local Node:")
for _, record := range result.Results {
fmt.Printf("Event Time: %d, Value: %s\n",
record.EventTime,
record.Value.String(),
)
}
}
- Ensure your local node is fully synchronized
- Check network connectivity
- Verify private key and authentication
- Review node logs for any synchronization issues
We have a mainnet network accessible at https://gateway.mainnet.truf.network. You can interact with it to test and experiment with the TN SDK. Please use it responsibly. Any contributions and feedback are welcome.
To connect to the mainnet, simply change the endpoint in your client initialization:
package main
import (
"context"
"fmt"
"log"
"github.com/trufnetwork/kwil-db/core/crypto"
"github.com/trufnetwork/kwil-db/core/crypto/auth"
"github.com/trufnetwork/sdk-go/core/tnclient"
)
func main() {
ctx := context.Background()
// Set up mainnet connection
pk, err := crypto.Secp256k1PrivateKeyFromHex("your-private-key")
if err != nil {
log.Fatalf("Failed to parse private key: %v", err)
}
signer := &auth.EthPersonalSigner{Key: *pk}
tnClient, err := tnclient.NewClient(
ctx,
"https://gateway.mainnet.truf.network", // Mainnet endpoint
tnclient.WithSigner(signer),
)
if err != nil {
log.Fatalf("Failed to create TN client: %v", err)
}
// Now you can perform operations on the mainnet
fmt.Println("Connected to TRUF.NETWORK Mainnet")
}
- Primitive Streams: Direct data sources from providers. Examples include indexes from known sources, aggregation output such as sentiment analysis, and off-chain/on-chain data.
- Composed Streams: Aggregate and process data from multiple streams.
- System Streams: Contract-managed streams audited and accepted by TN governance to ensure quality.
See type of streams and default TN contracts guides for more information.
- Data Providers: Publish and maintain data streams, taxonomies, and push primitives.
- Consumers: Access and utilize stream data. Examples include researchers, analysts, financial institutions, and DApp developers.
- Node Operators: Maintain network infrastructure and consensus. Note: The network is currently in a centralized phase during development. Decentralization is planned for future releases. This repository does not handle node operation.
Stream IDs are unique identifiers generated for each stream. They ensure consistent referencing across the network.
- Record: Data points used to calculate indexes. If a stream is a primitive, records are the raw data points. If a stream is composed, records are the weighted values.
- Index: Calculated values derived from stream data, representing a value's growth compared to the stream's first record.
- Primitives: Raw data points provided by data sources.
Critical Understanding: TN operations return success when transactions enter the mempool, NOT when they're executed on-chain. For operations where order matters, you must wait for transactions to be mined before proceeding.
π‘ See Complete Example: For a comprehensive demonstration of all transaction lifecycle patterns, see
examples/transaction-lifecycle-example/main.go
// β DANGEROUS - Race condition possible
deployTx, err := tnClient.DeployStream(ctx, streamId, types.StreamTypePrimitive)
// Stream might not be ready yet!
insertTx, err := primitiveActions.InsertRecord(ctx, input) // Could fail
destroyTx, err := tnClient.DestroyStream(ctx, streamId)
// Stream might not be destroyed yet!
insertTx, err := primitiveActions.InsertRecord(ctx, input) // Could succeed unexpectedly
import (
kwiltypes "github.com/trufnetwork/kwil-db/core/types"
// ... other imports
)
// β
SAFE - Explicit transaction confirmation
deployTx, err := tnClient.DeployStream(ctx, streamId, types.StreamTypePrimitive)
if err != nil {
return err
}
// Wait for deployment to complete
txRes, err := tnClient.WaitForTx(ctx, deployTx, time.Second*5)
if err != nil {
return err
}
if txRes.Result.Code != uint32(kwiltypes.CodeOk) {
return fmt.Errorf("deployment failed: %d", txRes.Result.Code)
}
// Now safe to proceed
insertTx, err := primitiveActions.InsertRecord(ctx, input)
import (
client "github.com/trufnetwork/kwil-db/core/client/types"
// ... other imports
)
// For operations that support TxOpt:
insertTx, err := primitiveActions.InsertRecord(ctx, input,
client.WithSyncBroadcast(true)) // Waits for mining
// Note: DeployStream and DestroyStream don't support TxOpt,
// so you must use WaitForTx with them
- β Stream deployment before data insertion
- β Stream deletion before cleanup verification
- β Sequential operations with dependencies
- β Testing and development scenarios
- β When immediate error detection is critical
- β‘ High-throughput data insertion (independent records)
- β‘ Fire-and-forget operations (with proper error handling)
- β‘ Batch operations where order within batch doesn't matter
TN supports granular control over stream access and visibility. Streams can be public or private, with read and write permissions configurable at the wallet level. Additionally, you can control whether other streams can compose data from your stream. For more details, refer to Stream Permissions.
The TN SDK provides comprehensive support for creating and managing both primitive and composed streams.
Primitive streams are raw data sources that can represent various types of data points. To create a primitive stream:
// Generate a unique stream ID
primitiveStreamId := util.GenerateStreamId("my-market-data-stream")
// Deploy the primitive stream
deployTx, err := tnClient.DeployStream(ctx, primitiveStreamId, types.StreamTypePrimitive)
// Insert records into the primitive stream
primitiveActions, err := tnClient.LoadPrimitiveActions()
insertTx, err := primitiveActions.InsertRecords(ctx, []types.InsertRecordInput{
{
DataProvider: dataProviderAddress,
StreamId: primitiveStreamId.String(),
EventTime: int(time.Now().Unix()),
Value: 100.5,
},
})
Composed streams aggregate and process data from multiple primitive or other composed streams. They use a taxonomy to define how child streams are combined:
// Deploy a composed stream
composedStreamId := util.GenerateStreamId("my-composite-index")
deployTx, err := tnClient.DeployStream(ctx, composedStreamId, types.StreamTypeComposed)
// Load composed actions
composedActions, err := tnClient.LoadComposedActions()
// Set taxonomy (define how child streams are combined)
taxonomyTx, err := composedActions.InsertTaxonomy(ctx, types.Taxonomy{
ParentStream: tnClient.OwnStreamLocator(composedStreamId),
TaxonomyItems: []types.TaxonomyItem{
{
ChildStream: tnClient.OwnStreamLocator(primitiveStreamId1),
Weight: 0.6, // 60% weight
},
{
ChildStream: tnClient.OwnStreamLocator(primitiveStreamId2),
Weight: 0.4, // 40% weight
},
},
})
// Get taxonomy information for a composed stream
taxonomyParams := types.DescribeTaxonomiesParams{
Stream: tnClient.OwnStreamLocator(composedStreamId),
LatestVersion: true,
}
taxonomyItems, err := composedActions.DescribeTaxonomies(ctx, taxonomyParams)
if err != nil {
log.Printf("Failed to describe taxonomies: %v", err)
} else {
for _, item := range taxonomyItems {
fmt.Printf("Child stream: %s, Weight: %.2f\n",
item.ChildStream.StreamId.String(), item.Weight)
}
}
For a comprehensive example demonstrating stream creation, taxonomy setup, and data retrieval, see the examples/complex_stream_example/main.go
file. This example shows:
- Deploying primitive streams
- Inserting records into primitive streams
- Creating a composed stream
- Setting up stream taxonomy
- Retrieving composed stream records
Key steps include:
- Generating unique stream IDs
- Deploying primitive and composed streams
- Inserting records into primitive streams
- Defining stream taxonomy
- Retrieving composed stream records
This example provides a practical walkthrough of creating and managing streams in the TRUF.NETWORK ecosystem.
A StreamLocator
is a unique identifier for a stream that consists of two key components:
StreamId
: A unique identifier for the streamDataProvider
: The Ethereum address of the stream's creator/owner
The OwnStreamLocator()
method is a convenience function that automatically creates a StreamLocator
using:
- The provided
StreamId
- The current client's Ethereum address
Example:
// Creates a StreamLocator with:
// - The given stream ID
// - The current client's address as the data provider
streamLocator := tnClient.OwnStreamLocator(myStreamId)
This is particularly useful when you're creating and managing your own streams, as it automatically uses your client's address.
A DataProvider
is the Ethereum address responsible for creating and managing a stream. When inserting records or performing operations on a stream, you need to specify the data provider's address.
To get the current client's address, use:
// Get the current client's Ethereum address
dataProviderAddress := tnClient.Address()
// Get the address as a string for use in stream operations
dataProviderAddressString := dataProviderAddress.Address()
Key differences:
tnClient.Address()
returns anEthereumAddress
objectdataProviderAddress.Address()
returns the address as a string, which is used in stream operations
// Generate a stream ID
streamId := util.GenerateStreamId("my-stream")
// Deploy the stream using the current client's address
deployTx, err := tnClient.DeployStream(ctx, streamId, types.StreamTypePrimitive)
// Create a stream locator
streamLocator := tnClient.OwnStreamLocator(streamId)
// Get the data provider address
dataProvider := tnClient.Address()
// Insert a record using the data provider address
insertTx, err := primitiveActions.InsertRecords(ctx, []types.InsertRecordInput{
{
DataProvider: dataProvider.Address(),
StreamId: streamId.String(),
EventTime: int(time.Now().Unix()),
Value: 100.5,
},
})
This approach ensures that:
- Streams are uniquely identified
- Records are correctly attributed to their creator
- Stream operations are performed with the correct addressing
Stream deletion is crucial for:
- Cleaning up unused or test streams
- Managing resource consumption
- Maintaining a clean and organized stream ecosystem
Streams can be deleted using the DestroyStream()
method:
// Destroy a specific stream
destroyTx, err := tnClient.DestroyStream(ctx, streamId)
if err != nil {
// Handle deletion error
log.Printf("Failed to destroy stream: %v", err)
}
// Wait for the destroy transaction to be mined
txRes, err := tnClient.WaitForTx(ctx, destroyTx, time.Second*5)
if err != nil {
log.Printf("Error waiting for stream destruction: %v", err)
} else if txRes.Result.Code != uint32(kwiltypes.CodeOk) {
log.Printf("Stream destruction failed: %s", txRes.Result.Log)
}
-
Cleanup in Reverse Order
- Delete composed streams before their child primitive streams
- Ensures proper resource management and prevents orphaned references
-
Error Handling
- Always check for errors during stream deletion
- Log and handle potential issues gracefully
-
Deferred Deletion
- Use
defer
for automatic cleanup in test or example scenarios - Ensures resources are freed even if an error occurs
- Use
Example of Deferred Stream Deletion:
func main() {
// Defer stream destruction
defer func() {
streamIds := []util.StreamId{
composedStreamId,
primitiveStreamId1,
primitiveStreamId2,
}
for _, streamId := range streamIds {
destroyTx, err := tnClient.DestroyStream(ctx, streamId)
if err != nil {
log.Printf("Failed to destroy stream %s: %v", streamId, err)
continue
}
// Wait for the destroy transaction
txRes, err := tnClient.WaitForTx(ctx, destroyTx, time.Second*5)
if err != nil {
log.Printf("Error waiting for destroy transaction: %v", err)
} else if txRes.Result.Code != uint32(kwiltypes.CodeOk) {
log.Printf("Destroy transaction failed for stream %s: %s", streamId, txRes.Result.Log)
}
}
}()
// Rest of the stream creation and management code
}
- Stream deletion is a permanent action
- Deleted streams cannot be recovered
- Ensure you have the necessary permissions to delete a stream
- In production, implement additional safeguards before deletion
- After completing testing
- When streams are no longer needed
- To free up resources
- As part of a stream lifecycle management strategy
By following these guidelines, you can effectively manage stream resources in the TRUF.NETWORK ecosystem.
π― Full Working Example: See
examples/transaction-lifecycle-example/main.go
for complete, runnable code demonstrating all these patterns with proper error handling.
Operation | Method |
---|---|
Deploy primitive stream | tnClient.DeployStream(ctx, streamId, types.StreamTypePrimitive) |
Deploy composed stream | tnClient.DeployStream(ctx, streamId, types.StreamTypeComposed) |
Insert records | primitiveActions.InsertRecords(ctx, records) |
Get stream data | composedActions.GetRecord(ctx, input) |
Set stream taxonomy | composedActions.InsertTaxonomy(ctx, taxonomy) |
Get stream taxonomy | composedActions.DescribeTaxonomies(ctx, params) |
Destroy stream | tnClient.DestroyStream(ctx, streamId) |
Deploy β Insert Pattern:
// Deploy and wait
deployTx, err := tnClient.DeployStream(ctx, streamId, types.StreamTypePrimitive)
if err != nil {
return err
}
txRes, err := tnClient.WaitForTx(ctx, deployTx, time.Second*5)
if err != nil || txRes.Result.Code != uint32(kwiltypes.CodeOk) {
return fmt.Errorf("deployment failed")
}
// Now safe to insert
_, err = primitiveActions.InsertRecord(ctx, input)
Insert β Destroy β Verify Pattern:
// Insert with sync
_, err := primitiveActions.InsertRecord(ctx, input, client.WithSyncBroadcast(true))
if err != nil {
return err
}
// Destroy and wait
destroyTx, err := tnClient.DestroyStream(ctx, streamId)
if err != nil {
return err
}
txRes, err := tnClient.WaitForTx(ctx, destroyTx, time.Second*5)
if err != nil || txRes.Result.Code != uint32(kwiltypes.CodeOk) {
return fmt.Errorf("destruction failed")
}
// Verify destruction (should fail)
_, err = primitiveActions.InsertRecord(ctx, input2) // Should error
types.StreamTypePrimitive
- Raw data streamstypes.StreamTypeComposed
- Aggregated streams with taxonomytypes.DescribeTaxonomiesParams
- Parameters for querying taxonomiestypes.TaxonomyItem
- Individual child stream with weight
- TN-SDK Documentation
- API Reference - Complete method documentation including taxonomy operations
- Truflation Whitepaper
For additional support or questions, please open an issue or contact our support team.
The SDK-Go repository is licensed under the Apache License, Version 2.0. See LICENSE for more details.