Skip to content

Embedded server: JetStream stream looses leader after restart and never recovers X is not current: monitor goroutine not running #6890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
thomas-maurice opened this issue May 12, 2025 · 3 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@thomas-maurice
Copy link

thomas-maurice commented May 12, 2025

Observed behavior

Hello ! I am trying to embed a Nats cluster in an application, create some streams and so on.

When I launch my app for the first time everything works great, the cluster forms and the stream is created. However, when I restart my processes the servers reconnect to the cluster, but the stream never recovers a leader and become un-useable.

When the server tries to use CreateOrUpdateStream it ends up throwing errors like context deadline exceeded. Which is odd because nats server info shows me all my 3 servers up and running, and I'm sure the network is not at fault because the snippet attached below is running locally (though I'm observing the same thing running on a kind cluster, I'm just doing the repro locally because it's easier to test out)

When running a nats stream info the cluster part of it is like so

Cluster Information:

                    Name: in-cluster-embeded
           Cluster Group: S-R2F-kq3mPdKk
                  Leader:
                 Replica: server-4222, outdated, seen 51.01s ago
                 Replica: server-4223, current, not seen
                 Replica: server-4224, current, not seen

Notice the absence of a leader

And when running nats stream list i see an inconsistent number of messages being stored depending on which server is answering the request.

I also notice that no new messages can be added to the stream even if I publish in the right subjects

I have attached a minimal example below to reproduce it.

Getting a health report of the server gives me this error

nats --context debug server report health                                                                                                                                 ─╯
╭──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                                                       Health Report                                                                      │
├─────────────┬────────────────────┬────────┬────────────────┬────────┬────────────────────────────────────────────────────────────────────────────────────┤
│ Server      │ Cluster            │ Domain │ Status         │ Type   │ Error                                                                              │
├─────────────┼────────────────────┼────────┼────────────────┼────────┼────────────────────────────────────────────────────────────────────────────────────┤
│ server-4222 │ in-cluster-embeded │        │ error (503)    │        │                                                                                    │
│             │                    │        │                │ STREAM │ JetStream stream 'app > some-stream' is not current: monitor goroutine not running │
│ server-4223 │ in-cluster-embeded │        │ error (503)    │        │                                                                                    │
│             │                    │        │                │ STREAM │ JetStream stream 'app > some-stream' is not current: monitor goroutine not running │
│ server-4224 │ in-cluster-embeded │        │ error (503)    │        │                                                                                    │
│             │                    │        │                │ STREAM │ JetStream stream 'app > some-stream' is not current: monitor goroutine not running │
├─────────────┼────────────────────┼────────┼────────────────┼────────┼────────────────────────────────────────────────────────────────────────────────────┤
│ 3           │ 1                  │        │ ok: 0 / err: 3 │        │ 3                                                                                  │
╰─────────────┴────────────────────┴────────┴────────────────┴────────┴────────────────────────────────────────────────────────────────────────────────────╯

I notice that if instead of using the snippet below I use server.Options{}.ProcessConfigString with a config string like this one for example
(with different storage directories)

    jetstream: enabled
    jetstream {
      store_dir: "./data/jetstream"
      max_mem: 512MB
      max_file: 1GB
    }

    http: "0.0.0.0:8222"
    listen: "0.0.0.0:4222"

    max_payload: 2MB

	accounts: {
		$SYS: {
			users: [
				{user: thomas, password: thomas}
			]
		}

		app: {
			jetstream: enabled
			users: [
				{user: app, password: app}
			]
		}
	}

    cluster {
      listen: "0.0.0.0:4248"
      name: "in-cluster-embeded"
      authorization {
        password: "bar"
        user: "foo"
        timeout: 5
      }

      routes = [
        nats://foo:bar@localhost:5222
        nats://foo:bar@localhost:5223
        nats://foo:bar@localhost:5224
      ]
    }

It seems to be working, which is a bit wild to me since from my understanding the two ways of setting up the cluster are functionally the same.

Expected behavior

The expected behaviour is that the stream should come back up after restarting nodes

Server and client version

I use the server module v2.11.3

Host environment

Tested both with the local code below and in a k8s environment, i don't think the environment is the issue here

Steps to reproduce

Create a go program like this one

package main

import (
	"context"
	"flag"
	"fmt"
	"net/url"
	"os"
	"os/signal"
	"syscall"
	"time"

	natsServer "github.com/nats-io/nats-server/v2/server"
	natsClient "github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
)

func NewNatsServer(port int) (*natsServer.Server, *natsClient.Conn, error) {
	sysAcct := natsServer.NewAccount("$SYS")
	appAcct := natsServer.NewAccount("app")

	routes := []*url.URL{
		{
			Scheme: "nats",
			Host:   "localhost:5222",
			User:   url.UserPassword("foo", "bar"),
		},
		{
			Scheme: "nats",
			Host:   "localhost:5223",
			User:   url.UserPassword("foo", "bar"),
		},
		{
			Scheme: "nats",
			Host:   "localhost:5224",
			User:   url.UserPassword("foo", "bar"),
		},
	}

	opts := &natsServer.Options{
		ServerName: fmt.Sprintf("server-%d", port),
		StoreDir:   fmt.Sprintf("./data/jetstream/%d", port),
		JetStream:  true,
		Routes:     routes,
		Port:       port,
		Cluster: natsServer.ClusterOpts{
			Name:      "in-cluster-embeded",
			Port:      port + 1000,
			Host:      "0.0.0.0",
			Advertise: fmt.Sprintf("localhost:%d", port+1000),
		},
		SystemAccount:          "$SYS",
		DisableJetStreamBanner: true,
		Users: []*natsServer.User{
			{
				Username: "thomas",
				Password: "thomas",
				Account:  sysAcct,
			},
			{
				Username: "app",
				Password: "app",
				Account:  appAcct,
			},
		},
		Accounts: []*natsServer.Account{sysAcct, appAcct},
	}

	ns, err := natsServer.NewServer(opts)
	if err != nil {
		return nil, nil, err
	}

	ns.ConfigureLogger()
	ns.Start()

	if !ns.ReadyForConnections(10 * time.Second) {
		panic("server failed to be ready for connection")
	}

	appAcct, err = ns.LookupAccount("app")
	if err != nil {
		return nil, nil, err
	}

	err = appAcct.EnableJetStream(map[string]natsServer.JetStreamAccountLimits{
		"": {
			MaxMemory:    1024 * 1024 * 1024,
			MaxStore:     -1,
			MaxStreams:   512,
			MaxConsumers: 512,
		},
	})

	if err != nil {
		return nil, nil, err
	}

	nc, err := natsClient.Connect("nats://localhost:4222", natsClient.UserInfo("app", "app"))
	if err != nil {
		return nil, nil, err
	}

	go TrySetupStreams(nc)

	return ns, nc, nil
}

func TrySetupStreams(nc *natsClient.Conn) {
	maxAttempts := 10
	for i := 0; i < maxAttempts; i++ {
		fmt.Println("attempting to create stream `some-stream`")
		js, err := jetstream.New(nc)
		if err != nil {
			panic(err)
		}

		_, err = js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
			Name:      "some-stream",
			Subjects:  []string{"some-stream.>"},
			Retention: jetstream.LimitsPolicy,
			Discard:   jetstream.DiscardOld,
			MaxAge:    time.Hour * 24 * 7,
			MaxBytes:  1024 * 1024 * 1024,
			Replicas:  2,
		})

		if err != nil {
			fmt.Println("failed to create stream: ", err)
		} else {
			fmt.Println("successfully created streams")
			return
		}

		time.Sleep(time.Second * 10)
	}
}

var (
	flagPort int
)

func init() {
	flag.IntVar(&flagPort, "port", 4442, "port to use")
}

func main() {
	flag.Parse()

	_, _, err := NewNatsServer(flagPort)
	if err != nil {
		panic(err)
	}

	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSEGV)
	for {
		s := <-c
		switch s {
		case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
			return
		default:
			return
		}
	}
}

Then run 3 instances in 3 terminals

go run main.go -port 4222
go run main.go -port 4223
go run main.go -port 4224

Then kill the processes a few times, the ones holding the stream's replicas, and you will see the errors described above

@thomas-maurice thomas-maurice added the defect Suspected defect such as a bug or regression label May 12, 2025
@thomas-maurice thomas-maurice changed the title Embedded server: Stream looses leader after restart and never recovers Embedded server: JetStream stream looses leader after restart and never recovers X is not current: monitor goroutine not running May 12, 2025
@neilalexander
Copy link
Member

The problem seems to be that you are waiting too long to call EnableJetStream for the account, i.e. that it's after ReadyForConnections. Doing it before ReadyForConnections seems to resolve the problem.

I think that by the time ReadyForConnections has fired, the metaleader has determined that the account isn't JetStream enabled and therefore the Raft group for the recovered stream gets torn down. Then EnableJetStream doesn't run through and check for missing nodes, hence the missing monitor goroutine.

@ripienaar
Copy link
Contributor

It does not seem possible to do it the way you want due to how the APIs are structured, your best bet is to template out a configuration file into a TempFile and load it at runtime

The problem comes when trying to add accounts and users, if you dont need to connect to this server from remote networks I'd suggest just using $G and $SYS and make embedded connections to it, disable client ports.

@thomas-maurice
Copy link
Author

Hey hey !

Thanks a lot for the reply, I have tried calling EnableJetStream right after ns.Start() and it does not seem to change anything for me (still the same errors than before).

I think I will just use the server.Options{}.ProcessConfigString method to configure out my server then, thanks a lot :)

For reference this is the updated version of the code calling `EnableJetStream` right after `Start`
package main

import (
	"context"
	"flag"
	"fmt"
	"net/url"
	"os"
	"os/signal"
	"syscall"
	"time"

	natsServer "github.com/nats-io/nats-server/v2/server"
	natsClient "github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
)

func NewNatsServer(port int) (*natsServer.Server, *natsClient.Conn, error) {
	sysAcct := natsServer.NewAccount("$SYS")
	appAcct := natsServer.NewAccount("app")

	routes := []*url.URL{
		{
			Scheme: "nats",
			Host:   "localhost:5222",
			User:   url.UserPassword("foo", "bar"),
		},
		{
			Scheme: "nats",
			Host:   "localhost:5223",
			User:   url.UserPassword("foo", "bar"),
		},
		{
			Scheme: "nats",
			Host:   "localhost:5224",
			User:   url.UserPassword("foo", "bar"),
		},
	}

	opts := &natsServer.Options{
		ServerName: fmt.Sprintf("server-%d", port),
		StoreDir:   fmt.Sprintf("./data/jetstream/%d", port),
		JetStream:  true,
		Routes:     routes,
		Port:       port,
		Cluster: natsServer.ClusterOpts{
			Name:      "in-cluster-embeded",
			Port:      port + 1000,
			Host:      "0.0.0.0",
			Advertise: fmt.Sprintf("localhost:%d", port+1000),
		},
		SystemAccount:          "$SYS",
		DisableJetStreamBanner: true,
		Users: []*natsServer.User{
			{
				Username: "thomas",
				Password: "thomas",
				Account:  sysAcct,
			},
			{
				Username: "app",
				Password: "app",
				Account:  appAcct,
			},
		},
		Accounts: []*natsServer.Account{sysAcct, appAcct},
	}

	ns, err := natsServer.NewServer(opts)
	if err != nil {
		return nil, nil, err
	}

	ns.ConfigureLogger()
	ns.Start()

	appAcct, err = ns.LookupAccount("app")
	if err != nil {
		return nil, nil, err
	}

	err = appAcct.EnableJetStream(map[string]natsServer.JetStreamAccountLimits{
		"": {
			MaxMemory:    1024 * 1024 * 1024,
			MaxStore:     -1,
			MaxStreams:   512,
			MaxConsumers: 512,
		},
	})

	if err != nil {
		return nil, nil, err
	}

	if !ns.ReadyForConnections(10 * time.Second) {
		panic("server failed to be ready for connection")
	}

	nc, err := natsClient.Connect("nats://localhost:4222", natsClient.UserInfo("app", "app"))
	if err != nil {
		return nil, nil, err
	}

	go TrySetupStreams(nc)

	return ns, nc, nil
}

func TrySetupStreams(nc *natsClient.Conn) {
	maxAttempts := 10
	for i := 0; i < maxAttempts; i++ {
		fmt.Println("attempting to create stream `some-stream`")
		js, err := jetstream.New(nc)
		if err != nil {
			panic(err)
		}

		_, err = js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
			Name:      "some-stream",
			Subjects:  []string{"some-stream.>"},
			Retention: jetstream.LimitsPolicy,
			Discard:   jetstream.DiscardOld,
			MaxAge:    time.Hour * 24 * 7,
			MaxBytes:  1024 * 1024 * 1024,
			Replicas:  3,
		})

		if err != nil {
			fmt.Println("failed to create stream: ", err)
		} else {
			fmt.Println("successfully created streams")
			return
		}

		time.Sleep(time.Second * 10)
	}
}

var (
	flagPort int
)

func init() {
	flag.IntVar(&flagPort, "port", 4442, "port to use")
}

func main() {
	flag.Parse()

	_, _, err := NewNatsServer(flagPort)
	if err != nil {
		panic(err)
	}

	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSEGV)
	for {
		s := <-c
		switch s {
		case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
			return
		default:
			return
		}
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

3 participants