Skip to content

Commit 1d3f8bb

Browse files
committed
wip: pending writes
1 parent 90faf71 commit 1d3f8bb

File tree

2 files changed

+127
-62
lines changed

2 files changed

+127
-62
lines changed

channeldb/graph.go

Lines changed: 110 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/lightningnetwork/lnd/aliasmgr"
2323
"github.com/lightningnetwork/lnd/batch"
2424
"github.com/lightningnetwork/lnd/channeldb/models"
25+
"github.com/lightningnetwork/lnd/fn"
2526
"github.com/lightningnetwork/lnd/input"
2627
"github.com/lightningnetwork/lnd/kvdb"
2728
"github.com/lightningnetwork/lnd/lnwire"
@@ -192,6 +193,9 @@ type ChannelGraph struct {
192193

193194
// cacheErr returns errors from the graph cache loading goroutine.
194195
cacheErr chan error
196+
197+
// cacheUpdates stores deferred in-flight cache writes.
198+
cacheUpdates *fn.ConcurrentQueue[func()]
195199
}
196200

197201
// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The
@@ -217,6 +221,7 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
217221
g.nodeScheduler = batch.NewTimeScheduler(
218222
db, nil, batchCommitInterval,
219223
)
224+
g.cacheUpdates = fn.NewConcurrentQueue[func()](1000)
220225

221226
// The graph cache can be turned off (e.g. for mobile users) for a
222227
// speed/memory usage tradeoff.
@@ -226,6 +231,7 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
226231
// Start populating the cache asynchronously.
227232
g.cacheReady = make(chan struct{})
228233
g.cacheErr = make(chan error, 1)
234+
g.cacheUpdates.Start()
229235
go g.populateGraphCache()
230236
}
231237

@@ -266,6 +272,14 @@ func (c *ChannelGraph) populateGraphCache() {
266272
return
267273
}
268274

275+
go func() {
276+
for update := range c.cacheUpdates.ChanOut() {
277+
c.cacheMu.Lock()
278+
update()
279+
c.cacheMu.Unlock()
280+
}
281+
}()
282+
269283
if c.graphCache != nil {
270284
log.Debugf("Finished populating in-memory channel graph (took "+
271285
"%v, %s)", time.Since(startTime), c.graphCache.Stats())
@@ -289,16 +303,46 @@ func (c *ChannelGraph) populateGraphCache() {
289303
// This method is non-blocking and should be used to safely access the graph
290304
// cache from concurrent goroutines.
291305
func (c *ChannelGraph) getGraphCache() (*GraphCache, error) {
306+
// If graph cache is not being used, return nil
292307
if c.graphCache == nil {
293308
return nil, nil
294309
}
295310

311+
// Check if there was an error during population
312+
if err := c.checkCacheErr(); err != nil {
313+
return nil, fmt.Errorf("graph cache population failed: %w", err)
314+
}
315+
316+
// At this point, we know the cache exists and there were no errors
317+
// We can return it regardless of whether it's fully populated or not.
318+
return c.graphCache, nil
319+
}
320+
321+
func (c *ChannelGraph) waitUntilGraphReady() {
322+
<-c.cacheReady
323+
}
324+
325+
// checkCacheErr is a helper function to check for errors in cache population
326+
func (c *ChannelGraph) checkCacheErr() error {
296327
select {
297-
case <-c.cacheReady:
298-
return c.graphCache, nil
299328
case err := <-c.cacheErr:
300-
return nil, fmt.Errorf("graph cache population failed: %w", err)
329+
return err
330+
default:
331+
return nil
332+
}
333+
}
334+
335+
// enqueueCacheUpdate either defers a graph cache write or writes it
336+
// right away, depending on the cache state.
337+
func (c *ChannelGraph) enqueueCacheUpdate(update func()) {
338+
if c.graphCache == nil {
339+
// If we're not using the graph cache, execute the update
340+
update()
341+
return
301342
}
343+
344+
// Otherwise, queue the update
345+
c.cacheUpdates.ChanIn() <- update
302346
}
303347

304348
// getChannelMap loads all channel edge policies from the database and stores
@@ -892,16 +936,15 @@ func (c *ChannelGraph) AddLightningNode(node *LightningNode,
892936

893937
r := &batch.Request{
894938
Update: func(tx kvdb.RwTx) error {
895-
graphCache, err := c.getGraphCache()
896-
if err == nil && graphCache != nil {
897-
cNode := newGraphCacheNode(
898-
node.PubKeyBytes, node.Features,
899-
)
900-
err := graphCache.AddNode(tx, cNode)
901-
if err != nil {
902-
return err
939+
c.enqueueCacheUpdate(func() {
940+
graphCache, err := c.getGraphCache()
941+
if err == nil && graphCache != nil {
942+
cNode := newGraphCacheNode(
943+
node.PubKeyBytes, node.Features,
944+
)
945+
graphCache.AddNode(tx, cNode)
903946
}
904-
}
947+
})
905948

906949
return addLightningNode(tx, node)
907950
},
@@ -981,10 +1024,12 @@ func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
9811024
return ErrGraphNodeNotFound
9821025
}
9831026

984-
graphCache, err := c.getGraphCache()
985-
if err == nil && graphCache != nil {
986-
graphCache.RemoveNode(nodePub)
987-
}
1027+
c.enqueueCacheUpdate(func() {
1028+
graphCache, err := c.getGraphCache()
1029+
if err == nil && graphCache != nil {
1030+
graphCache.RemoveNode(nodePub)
1031+
}
1032+
})
9881033

9891034
return c.deleteLightningNode(nodes, nodePub[:])
9901035
}, func() {})
@@ -1113,10 +1158,12 @@ func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx,
11131158
return ErrEdgeAlreadyExist
11141159
}
11151160

1116-
graphCache, err := c.getGraphCache()
1117-
if err == nil && graphCache != nil {
1118-
graphCache.AddChannel(edge, nil, nil)
1119-
}
1161+
c.enqueueCacheUpdate(func() {
1162+
graphCache, err := c.getGraphCache()
1163+
if err == nil && graphCache != nil {
1164+
graphCache.AddChannel(edge, nil, nil)
1165+
}
1166+
})
11201167

11211168
// Before we insert the channel into the database, we'll ensure that
11221169
// both nodes already exist in the channel graph. If either node
@@ -1317,10 +1364,12 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *models.ChannelEdgeInfo) error {
13171364
return ErrEdgeNotFound
13181365
}
13191366

1320-
graphCache, err := c.getGraphCache()
1321-
if err == nil && graphCache != nil {
1322-
graphCache.UpdateChannel(edge)
1323-
}
1367+
c.enqueueCacheUpdate(func() {
1368+
graphCache, err := c.getGraphCache()
1369+
if err == nil && graphCache != nil {
1370+
graphCache.UpdateChannel(edge)
1371+
}
1372+
})
13241373

13251374
return putChanEdgeInfo(edgeIndex, edge, chanKey)
13261375
}, func() {})
@@ -1565,10 +1614,12 @@ func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket,
15651614
continue
15661615
}
15671616

1568-
graphCache, err := c.getGraphCache()
1569-
if err == nil && graphCache != nil {
1570-
graphCache.RemoveNode(nodePubKey)
1571-
}
1617+
c.enqueueCacheUpdate(func() {
1618+
graphCache, err := c.getGraphCache()
1619+
if err == nil && graphCache != nil {
1620+
graphCache.RemoveNode(nodePubKey)
1621+
}
1622+
})
15721623

15731624
// If we reach this point, then there are no longer any edges
15741625
// that connect this node, so we can delete it.
@@ -2597,13 +2648,15 @@ func (c *ChannelGraph) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex,
25972648
return err
25982649
}
25992650

2600-
graphCache, err := c.getGraphCache()
2601-
if err == nil && graphCache != nil {
2602-
graphCache.RemoveChannel(
2603-
edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
2604-
edgeInfo.ChannelID,
2605-
)
2606-
}
2651+
c.enqueueCacheUpdate(func() {
2652+
graphCache, err := c.getGraphCache()
2653+
if err == nil && graphCache != nil {
2654+
graphCache.RemoveChannel(
2655+
edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
2656+
edgeInfo.ChannelID,
2657+
)
2658+
}
2659+
})
26072660

26082661
// We'll also remove the entry in the edge update index bucket before
26092662
// we delete the edges themselves so we can access their last update
@@ -2736,10 +2789,9 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
27362789
},
27372790
Update: func(tx kvdb.RwTx) error {
27382791
var err error
2739-
graphCache, _ := c.getGraphCache()
27402792

27412793
isUpdate1, err = updateEdgePolicy(
2742-
tx, edge, graphCache,
2794+
tx, edge, c,
27432795
)
27442796

27452797
// Silence ErrEdgeNotFound so that the batch can
@@ -2806,7 +2858,7 @@ func (c *ChannelGraph) updateEdgeCache(e *models.ChannelEdgePolicy,
28062858
// true if the updated policy belongs to node1, and false if the policy belonged
28072859
// to node2.
28082860
func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy,
2809-
graphCache *GraphCache) (bool, error) {
2861+
c *ChannelGraph) (bool, error) {
28102862

28112863
edges := tx.ReadWriteBucket(edgeBucket)
28122864
if edges == nil {
@@ -2857,11 +2909,14 @@ func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy,
28572909
copy(fromNodePubKey[:], fromNode)
28582910
copy(toNodePubKey[:], toNode)
28592911

2860-
if graphCache != nil {
2861-
graphCache.UpdatePolicy(
2862-
edge, fromNodePubKey, toNodePubKey, isUpdate1,
2863-
)
2864-
}
2912+
c.enqueueCacheUpdate(func() {
2913+
graphCache, _ := c.getGraphCache()
2914+
if graphCache != nil {
2915+
graphCache.UpdatePolicy(
2916+
edge, fromNodePubKey, toNodePubKey, isUpdate1,
2917+
)
2918+
}
2919+
})
28652920

28662921
return isUpdate1, nil
28672922
}
@@ -3732,10 +3787,12 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
37323787
"bucket: %w", err)
37333788
}
37343789

3735-
graphCache, cacheErr := c.getGraphCache()
3736-
if cacheErr == nil && graphCache != nil {
3737-
graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
3738-
}
3790+
c.enqueueCacheUpdate(func() {
3791+
graphCache, cacheErr := c.getGraphCache()
3792+
if cacheErr == nil && graphCache != nil {
3793+
graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
3794+
}
3795+
})
37393796

37403797
return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
37413798
})
@@ -3825,10 +3882,12 @@ func (c *ChannelGraph) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
38253882
}
38263883

38273884
for _, edgeInfo := range edgeInfos {
3828-
graphCache.AddChannel(
3829-
edgeInfo.Info, edgeInfo.Policy1,
3830-
edgeInfo.Policy2,
3831-
)
3885+
c.enqueueCacheUpdate(func() {
3886+
graphCache.AddChannel(
3887+
edgeInfo.Info, edgeInfo.Policy1,
3888+
edgeInfo.Policy2,
3889+
)
3890+
})
38323891
}
38333892
}
38343893

channeldb/graph_test.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4007,14 +4007,14 @@ func TestGraphLoading(t *testing.T) {
40074007
)
40084008
require.NoError(t, err)
40094009

4010-
// Block until graph cache is loaded.
4011-
graph.getGraphCache()
4012-
40134010
// Populate the graph with test data.
40144011
const numNodes = 100
40154012
const numChannels = 4
40164013
_, _ = fillTestGraph(t, graph, numNodes, numChannels)
40174014

4015+
// Block until graph cache is loaded and pending updates are applied.
4016+
graph.waitUntilGraphReady()
4017+
40184018
// Recreate the graph. This should cause the graph cache to be
40194019
// populated.
40204020
graphReloaded, err := NewChannelGraph(
@@ -4024,17 +4024,23 @@ func TestGraphLoading(t *testing.T) {
40244024
)
40254025
require.NoError(t, err)
40264026

4027-
// Block until graph cache is loaded.
4028-
graphReloaded.getGraphCache()
4027+
// Block until graph cache is ready.
4028+
graphReloaded.waitUntilGraphReady()
40294029

40304030
// Assert that the cache content is identical.
4031-
require.Equal(
4032-
t, graph.graphCache.nodeChannels,
4033-
graphReloaded.graphCache.nodeChannels,
4031+
require.True(
4032+
t,
4033+
reflect.DeepEqual(
4034+
graph.graphCache.nodeChannels,
4035+
graphReloaded.graphCache.nodeChannels,
4036+
),
40344037
)
40354038

4036-
require.Equal(
4037-
t, graph.graphCache.nodeFeatures,
4038-
graphReloaded.graphCache.nodeFeatures,
4039+
require.True(
4040+
t,
4041+
reflect.DeepEqual(
4042+
graph.graphCache.nodeFeatures,
4043+
graphReloaded.graphCache.nodeFeatures,
4044+
),
40394045
)
40404046
}

0 commit comments

Comments
 (0)