diff --git a/Dockerfile b/Dockerfile index d93d68f8a..8212692d0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -52,13 +52,6 @@ COPY kubectl-fdb/ kubectl-fdb/ # Build RUN CGO_ENABLED=1 GOOS=linux GOARCH=${TARGETARCH} GO111MODULE=on make manager plugin-go -# Create user and group here since we don't have the tools -# in distroless -RUN groupadd --gid 4059 fdb && \ - useradd --gid 4059 --uid 4059 --create-home --shell /bin/bash fdb && \ - mkdir -p /var/log/fdb && \ - touch /var/log/fdb/.keep - FROM docker.io/rockylinux/rockylinux:9.5-minimal ARG FDB_VERSION @@ -95,11 +88,14 @@ RUN set -eux && \ rpm -i foundationdb-clients-${FDB_VERSION}-1.${FDB_OS}.${FDB_ARCH}.rpm --excludepath=/usr/bin --excludepath=/usr/lib/foundationdb/backup_agent && \ rm foundationdb-clients-${FDB_VERSION}-1.${FDB_OS}.${FDB_ARCH}.rpm foundationdb-clients-${FDB_VERSION}-1.${FDB_OS}.${FDB_ARCH}.rpm.sha256 -COPY --from=builder /etc/passwd /etc/passwd -COPY --from=builder /etc/group /etc/group + +RUN groupadd --gid 4059 fdb && \ + useradd --gid 4059 --uid 4059 --shell /usr/sbin/nologin fdb && \ + mkdir -p /var/log/fdb && \ + touch /var/log/fdb/.keep + COPY --chown=fdb:fdb --from=builder /workspace/bin/manager . COPY --chown=fdb:fdb --from=builder /workspace/bin/kubectl-fdb /usr/local/bin/kubectl-fdb -COPY --chown=fdb:fdb --from=builder /var/log/fdb/.keep /var/log/fdb/.keep # Set to the numeric UID of fdb user to satisfy PodSecurityPolices which enforce runAsNonRoot USER 4059 diff --git a/e2e/Makefile b/e2e/Makefile index 88f798acb..b15ec4a62 100644 --- a/e2e/Makefile +++ b/e2e/Makefile @@ -28,6 +28,7 @@ STORAGE_ENGINE?= DUMP_OPERATOR_STATE?=true SEAWEEDFS_IMAGE?=chrislusf/seaweedfs:3.73 NODE_SELECTOR?= +DATA_LOADER_IMAGE?= # Defines the cloud provider used for the underlying Kubernetes cluster. Currently only kind is support, other cloud providers # should still work but this test framework has no special cases for those. CLOUD_PROVIDER?= @@ -177,6 +178,7 @@ endif --fdb-image="$(FDB_IMAGE)" \ --sidecar-image="$(SIDECAR_IMAGE)" \ --operator-image="$(OPERATOR_IMAGE)" \ + --data-loader-image=$(DATA_LOADER_IMAGE) \ --registry="$(REGISTRY)" \ --fdb-version="$(FDB_VERSION)" \ --cleanup=$(CLEANUP) \ diff --git a/e2e/fixtures/fdb_data_loader.go b/e2e/fixtures/fdb_data_loader.go index 17852add5..da76de690 100644 --- a/e2e/fixtures/fdb_data_loader.go +++ b/e2e/fixtures/fdb_data_loader.go @@ -29,11 +29,10 @@ import ( "text/template" "time" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "github.com/onsi/gomega" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -46,7 +45,7 @@ const ( // The name of the data loader Job. dataLoaderName = "fdb-data-loader" - // For now we only load 2GB into the cluster, we can increase this later if we want. + // We load 2GB into the cluster, we can increase this later if we want. dataLoaderJob = `apiVersion: batch/v1 kind: Job metadata: @@ -66,9 +65,11 @@ spec: name: {{ .Name }} # This configuration will load ~1GB per data loader. args: - - --keys=1000000 - - --batch-size=50 - - --value-size=1000 + - --keys={{ .Config.Keys }} + - --batch-size={{ .Config.BatchSize }} + - --value-size={{ .Config.ValueSize }} + - --cluster-file-directory=/var/dynamic/fdb + - --read-values={{ .Config.ReadValues }} env: - name: FDB_CLUSTER_FILE value: /var/dynamic/fdb/fdb.cluster @@ -86,18 +87,22 @@ spec: value: /var/dynamic/fdb/primary/lib - name: FDB_NETWORK_OPTION_TRACE_LOG_GROUP value: {{ .Name }} + - name: FDB_NETWORK_OPTION_TRACE_ENABLE + value: "/tmp/fdb-trace-logs" - name: FDB_NETWORK_OPTION_EXTERNAL_CLIENT_DIRECTORY value: /var/dynamic/fdb/libs - - name: PYTHONUNBUFFERED - value: "on" + - name: FDB_NETWORK_OPTION_TRACE_FORMAT + value: json + - name: FDB_NETWORK_OPTION_CLIENT_THREADS_PER_VERSION + value: "10" volumeMounts: - - name: config-map - mountPath: /var/dynamic-conf - name: fdb-libs mountPath: /var/dynamic/fdb - name: fdb-certs mountPath: /tmp/fdb-certs readOnly: true + - name: fdb-logs + mountPath: /tmp/fdb-trace-logs resources: requests: cpu: "1" @@ -121,9 +126,9 @@ spec: runAsGroup: 0 # Install this library in a special location to force the operator to use it as the primary library. {{ if .CopyAsPrimary }} - - name: foundationdb-kubernetes-init-7-1-primary + - name: foundationdb-kubernetes-init-primary image: {{ .Image }} - imagePullPolicy: {{ .ImagePullPolicy }} + imagePullPolicy: Always args: # Note that we are only copying a library, rather than copying any binaries. - "--copy-library" @@ -143,9 +148,9 @@ spec: - /bin/bash args: - -c - - mkdir -p /var/dynamic/fdb/libs && {{ range $index, $version := .SidecarVersions -}} cp /var/dynamic/fdb/{{ .FDBVersion.Compact }}/lib/libfdb_c.so /var/dynamic/fdb/libs/libfdb_{{ .FDBVersion.Compact }}_c.so && {{ end }} cp /var/dynamic-conf/fdb.cluster /var/dynamic/fdb/fdb.cluster + - mkdir -p /var/dynamic/fdb/libs && {{ range $index, $version := .SidecarVersions -}} cp /var/dynamic/fdb/{{ .FDBVersion.Compact }}/lib/libfdb_c.so /var/dynamic/fdb/libs/libfdb_{{ .FDBVersion.Compact }}_c.so && {{ end }} cp /var/dynamic-conf/*.cluster /var/dynamic/fdb/ volumeMounts: - - name: config-map + - name: cluster-files mountPath: /var/dynamic-conf - name: fdb-libs mountPath: /var/dynamic/fdb @@ -154,19 +159,26 @@ spec: readOnly: true restartPolicy: Never volumes: - - name: config-map - configMap: - name: {{ .ClusterName }}-config - items: - - key: cluster-file - path: fdb.cluster + - name: cluster-files + projected: + sources: +{{- range $index, $clusterName := .ClusterNames }} + - name: {{ $clusterName }}-config + configMap: + name: {{ $clusterName }}-config + items: + - key: cluster-file + path: {{ $clusterName }}.cluster +{{- end }} - name: fdb-libs emptyDir: {} + - name: fdb-logs + emptyDir: {} - name: fdb-certs secret: secretName: {{ .SecretName }}` - // For now we only load 2GB into the cluster, we can increase this later if we want. + // For now, we only load 2GB into the cluster, we can increase this later if we want. dataLoaderJobUnifiedImage = `apiVersion: batch/v1 kind: Job metadata: @@ -186,9 +198,11 @@ spec: name: {{ .Name }} # This configuration will load ~1GB per data loader. args: - - --keys=1000000 - - --batch-size=50 - - --value-size=1000 + - --keys={{ .Config.Keys }} + - --batch-size={{ .Config.BatchSize }} + - --value-size={{ .Config.ValueSize }} + - --cluster-file-directory=/var/dynamic/fdb + - --read-values={{ .Config.ReadValues }} env: - name: FDB_CLUSTER_FILE value: /var/dynamic/fdb/fdb.cluster @@ -202,31 +216,35 @@ spec: # Consider remove this option once 6.3 is no longer being used. - name: FDB_NETWORK_OPTION_IGNORE_EXTERNAL_CLIENT_FAILURES value: "" + - name: FDB_NETWORK_OPTION_TRACE_ENABLE + value: "/tmp/fdb-trace-logs" - name: LD_LIBRARY_PATH value: /var/dynamic/fdb - name: FDB_NETWORK_OPTION_TRACE_LOG_GROUP value: {{ .Name }} - name: FDB_NETWORK_OPTION_EXTERNAL_CLIENT_DIRECTORY value: /var/dynamic/fdb - - name: PYTHONUNBUFFERED - value: "on" + - name: FDB_NETWORK_OPTION_TRACE_FORMAT + value: json + - name: FDB_NETWORK_OPTION_CLIENT_THREADS_PER_VERSION + value: "10" volumeMounts: - - name: config-map - mountPath: /var/dynamic-conf - name: fdb-libs mountPath: /var/dynamic/fdb - name: fdb-certs mountPath: /tmp/fdb-certs readOnly: true + - name: fdb-logs + mountPath: /tmp/fdb-trace-logs resources: requests: cpu: "1" memory: 4Gi initContainers: - {{ range $index, $version := .SidecarVersions }} +{{- range $index, $version := .SidecarVersions }} - name: foundationdb-kubernetes-init-{{ $index }} image: {{ .Image }} - imagePullPolicy: {{ .ImagePullPolicy }} + imagePullPolicy: Always args: - --mode - init @@ -234,20 +252,20 @@ spec: - /var/output-files - --copy-library - "{{ .FDBVersion.Compact }}" -{{ if .CopyAsPrimary }} +{{- if .CopyAsPrimary }} - --copy-primary-library - "{{ .FDBVersion.Compact }}" -{{ end }} +{{- end }} volumeMounts: - name: fdb-libs mountPath: /var/output-files securityContext: runAsUser: 0 runAsGroup: 0 -{{ if .CopyAsPrimary }} +{{- if .CopyAsPrimary }} - name: foundationdb-kubernetes-init-cluster-file image: {{ .Image }} - imagePullPolicy: {{ .ImagePullPolicy }} + imagePullPolicy: Always args: - --mode - init @@ -255,36 +273,45 @@ spec: - /var/dynamic-conf - --output-dir - /var/output-files +{{- range $index, $clusterName := $.ClusterNames }} - --copy-file - - fdb.cluster + - {{ $clusterName }}.cluster - --require-not-empty - - fdb.cluster + - {{ $clusterName }}.cluster +{{- end }} volumeMounts: - name: fdb-libs mountPath: /var/output-files - - name: config-map - mountPath: /var/dynamic-conf + - name: cluster-files + mountPath: /var/dynamic-conf/ securityContext: runAsUser: 0 runAsGroup: 0 -{{ end }} - {{ end }} +{{- end }} +{{- end }} restartPolicy: Never volumes: - - name: config-map - configMap: - name: {{ .ClusterName }}-config - items: - - key: cluster-file - path: fdb.cluster + - name: cluster-files + projected: + sources: +{{- range $index, $clusterName := .ClusterNames }} + - name: {{ $clusterName }}-config + configMap: + name: {{ $clusterName }}-config + items: + - key: cluster-file + path: {{ $clusterName }}.cluster +{{- end }} - name: fdb-libs emptyDir: {} + - name: fdb-logs + emptyDir: {} - name: fdb-certs secret: secretName: {{ .SecretName }}` ) -// dataLoaderConfig represents the configuration of the Dataloader Job. +// dataLoaderConfig represents the configuration of the data-loader Job. type dataLoaderConfig struct { // Name of the data loader Job. Name string @@ -294,32 +321,71 @@ type dataLoaderConfig struct { SidecarVersions []SidecarConfig // Namespace represents the namespace for the Deployment and all associated resources Namespace string - // ClusterName the name of the cluster to load data into. - ClusterName string + // ClusterNames the names of the clusters to load data into. + ClusterNames []string // SecretName represents the Kubernetes secret that contains the certificates for communicating with the FoundationDB // cluster. SecretName string + // Config defines the workload configuration. + Config *WorkloadConfig } -func (factory *Factory) getDataLoaderConfig(cluster *FdbCluster) *dataLoaderConfig { +// WorkloadConfig defines the workload configuration. +type WorkloadConfig struct { + // Keys defines how many keys should be written by the data loader. + Keys int + // BatchSize defines how many keys should be inserted per batch (transaction). + BatchSize int + // ValueSize defines the value size in bytes per key-value pair. + ValueSize int + // ReadValues defines if the data loader should be reading the written values again to add some read load. + ReadValues bool +} + +func (config *WorkloadConfig) setDefaults() { + if config.Keys == 0 { + config.Keys = 1000000 + } + + if config.BatchSize == 0 { + config.BatchSize = 1000 + } + + if config.ValueSize == 0 { + config.ValueSize = 1000 + } +} + +func (factory *Factory) getDataLoaderConfig(clusters []*FdbCluster, config *WorkloadConfig) *dataLoaderConfig { + if config == nil { + config = &WorkloadConfig{} + } + + config.setDefaults() + + clusterNames := make([]string, 0, len(clusters)) + for _, cluster := range clusters { + clusterNames = append(clusterNames, cluster.Name()) + } return &dataLoaderConfig{ Name: dataLoaderName, Image: factory.GetDataLoaderImage(), - Namespace: cluster.Namespace(), + Namespace: clusters[0].Namespace(), SidecarVersions: factory.GetSidecarConfigs(), - ClusterName: cluster.Name(), + ClusterNames: clusterNames, SecretName: factory.GetSecretName(), + Config: config, } } // CreateDataLoaderIfAbsent will create the data loader for the provided cluster and load some random data into the cluster. func (factory *Factory) CreateDataLoaderIfAbsent(cluster *FdbCluster) { - factory.CreateDataLoaderIfAbsentWithWait(cluster, true) + factory.CreateDataLoaderIfAbsentWithWait(cluster, nil, true) } -// CreateDataLoaderIfAbsentWithWait will create the data loader for the provided cluster and load some random data into the cluster. -// If wait is true, the method will wait until the data loader has finished. -func (factory *Factory) CreateDataLoaderIfAbsentWithWait(cluster *FdbCluster, wait bool) { +// CreateDataLoaderIfAbsentWithWaitForMultipleClusters will create a data loader configuration that loads data into multiple +// FoundationDB clusters. +func (factory *Factory) CreateDataLoaderIfAbsentWithWaitForMultipleClusters(clusters []*FdbCluster, config *WorkloadConfig, wait bool) { if !factory.options.enableDataLoading { return } @@ -331,7 +397,7 @@ func (factory *Factory) CreateDataLoaderIfAbsentWithWait(cluster *FdbCluster, wa t, err := template.New("dataLoaderJob").Parse(dataLoaderJobTemplate) gomega.Expect(err).NotTo(gomega.HaveOccurred()) buf := bytes.Buffer{} - gomega.Expect(t.Execute(&buf, factory.getDataLoaderConfig(cluster))).NotTo(gomega.HaveOccurred()) + gomega.Expect(t.Execute(&buf, factory.getDataLoaderConfig(clusters, config))).NotTo(gomega.HaveOccurred()) decoder := yamlutil.NewYAMLOrJSONDecoder(&buf, 100000) for { var rawObj runtime.RawExtension @@ -359,8 +425,14 @@ func (factory *Factory) CreateDataLoaderIfAbsentWithWait(cluster *FdbCluster, wa return } - factory.WaitUntilDataLoaderIsDone(cluster) - factory.DeleteDataLoader(cluster) + factory.WaitUntilDataLoaderIsDone(clusters[0]) + factory.DeleteDataLoader(clusters[0]) +} + +// CreateDataLoaderIfAbsentWithWait will create the data loader for the provided cluster and load some random data into the cluster. +// If wait is true, the method will wait until the data loader has finished. +func (factory *Factory) CreateDataLoaderIfAbsentWithWait(cluster *FdbCluster, config *WorkloadConfig, wait bool) { + factory.CreateDataLoaderIfAbsentWithWaitForMultipleClusters([]*FdbCluster{cluster}, config, wait) } // DeleteDataLoader will delete the data loader job diff --git a/e2e/test_operator_ha/operator_ha_test.go b/e2e/test_operator_ha/operator_ha_test.go index e68332f9a..8488f8ebb 100644 --- a/e2e/test_operator_ha/operator_ha_test.go +++ b/e2e/test_operator_ha/operator_ha_test.go @@ -337,7 +337,7 @@ var _ = Describe("Operator HA tests", Label("e2e", "pr"), func() { }) // TODO (johscheuer): Allow to have this as a long running task until the test is done. - factory.CreateDataLoaderIfAbsentWithWait(fdbCluster.GetPrimary(), false) + factory.CreateDataLoaderIfAbsentWithWait(fdbCluster.GetPrimary(), nil, false) time.Sleep(1 * time.Minute) log.Println("replacedPod", replacedPod.Name, "useLocalitiesForExclusion", fdbCluster.GetPrimary().GetCluster().UseLocalitiesForExclusion()) @@ -454,7 +454,7 @@ var _ = Describe("Operator HA tests", Label("e2e", "pr"), func() { }) // TODO (johscheuer): Allow to have this as a long running task until the test is done. - factory.CreateDataLoaderIfAbsentWithWait(fdbCluster.GetPrimary(), false) + factory.CreateDataLoaderIfAbsentWithWait(fdbCluster.GetPrimary(), nil, false) time.Sleep(1 * time.Minute) log.Println("replacedPod", replacedPod.Name, "useLocalitiesForExclusion", fdbCluster.GetPrimary().GetCluster().UseLocalitiesForExclusion()) diff --git a/sample-apps/data-loader/Dockerfile b/sample-apps/data-loader/Dockerfile index f1343d65c..a52e79366 100644 --- a/sample-apps/data-loader/Dockerfile +++ b/sample-apps/data-loader/Dockerfile @@ -1,23 +1,90 @@ -FROM docker.io/library/python:3.13-slim +ARG FDB_VERSION=7.1.67 +ARG FDB_WEBSITE=https://github.com/apple/foundationdb/releases/download +# Build the manager binary +FROM docker.io/library/golang:1.24.4 AS builder + +ARG FDB_VERSION +ARG FDB_WEBSITE +ARG TARGETARCH +ARG TAG="latest" + +RUN set -eux && \ + if [ "$TARGETARCH" = "amd64" ]; then \ + FDB_ARCH=amd64; \ + elif [ "$TARGETARCH" = "arm64" ]; then \ + FDB_ARCH=aarch64; \ + if [ "${FDB_VERSION%.*}" = "7.1" ]; then \ + FDB_VERSION="7.3.63"; \ + fi; \ + else \ + echo "ERROR: unsupported architecture $TARGETARCH" 1>&2; \ + exit 1; \ + fi; \ + curl --fail -L "${FDB_WEBSITE}/${FDB_VERSION}/foundationdb-clients_${FDB_VERSION}-1_${FDB_ARCH}.deb" -o foundationdb-clients_${FDB_VERSION}-1_${FDB_ARCH}.deb && \ + curl --fail -L "${FDB_WEBSITE}/${FDB_VERSION}/foundationdb-clients_${FDB_VERSION}-1_${FDB_ARCH}.deb.sha256" -o foundationdb-clients_${FDB_VERSION}-1_${FDB_ARCH}.deb.sha256 && \ + sha256sum -c foundationdb-clients_${FDB_VERSION}-1_${FDB_ARCH}.deb.sha256 && \ + dpkg -i foundationdb-clients_${FDB_VERSION}-1_${FDB_ARCH}.deb && \ + rm foundationdb-clients_${FDB_VERSION}-1_${FDB_ARCH}.deb foundationdb-clients_${FDB_VERSION}-1_${FDB_ARCH}.deb.sha256 + +WORKDIR /workspace +# Copy the Go Modules manifests +COPY go.mod go.mod +COPY go.sum go.sum +# cache deps before building and copying source so that we don't need to re-download as much +# and so that source changes don't invalidate our downloaded layer +RUN go mod download -x + +# Copy the go source +COPY main.go main.go + +# Build +RUN CGO_ENABLED=1 GOOS=linux GOARCH=${TARGETARCH} GO111MODULE=on go build -o /workspace/bin/data-loader main.go + +FROM docker.io/rockylinux/rockylinux:9.5-minimal + +ARG FDB_VERSION +ARG FDB_WEBSITE ARG TARGETARCH -COPY app.py /usr/local/bin +VOLUME /usr/lib/fdb + +WORKDIR / + +RUN set -eux && \ + if [ "$TARGETARCH" = "amd64" ]; then \ + FDB_ARCH=x86_64; \ + elif [ "$TARGETARCH" = "arm64" ]; then \ + FDB_ARCH=aarch64; \ + if [ "${FDB_VERSION%.*}" = "7.1" ]; then \ + FDB_VERSION="7.3.63"; \ + fi; \ + else \ + echo "ERROR: unsupported architecture $TARGETARCH" 1>&2; \ + exit 1; \ + fi; \ + if [ "${FDB_VERSION%.*}" = "7.1" ]; then \ + # FDB 7.1 published the client packages for el7, 7.3 and newer uses el9. + FDB_OS=el7; \ + else \ + FDB_OS=el9; \ + fi; \ + curl --fail -L "${FDB_WEBSITE}/${FDB_VERSION}/foundationdb-clients-${FDB_VERSION}-1.${FDB_OS}.${FDB_ARCH}.rpm" -o foundationdb-clients-${FDB_VERSION}-1.${FDB_OS}.${FDB_ARCH}.rpm && \ + curl --fail -L "${FDB_WEBSITE}/${FDB_VERSION}/foundationdb-clients-${FDB_VERSION}-1.${FDB_OS}.${FDB_ARCH}.rpm.sha256" -o foundationdb-clients-${FDB_VERSION}-1.${FDB_OS}.${FDB_ARCH}.rpm.sha256 && \ + microdnf install -y glibc pkg-config && \ + microdnf clean all && \ + sha256sum -c foundationdb-clients-${FDB_VERSION}-1.${FDB_OS}.${FDB_ARCH}.rpm.sha256 && \ + rpm -i foundationdb-clients-${FDB_VERSION}-1.${FDB_OS}.${FDB_ARCH}.rpm --excludepath=/usr/bin --excludepath=/usr/lib/foundationdb/backup_agent && \ + rm foundationdb-clients-${FDB_VERSION}-1.${FDB_OS}.${FDB_ARCH}.rpm foundationdb-clients-${FDB_VERSION}-1.${FDB_OS}.${FDB_ARCH}.rpm.sha256 -RUN pip install foundationdb==7.1.67 RUN groupadd --gid 4059 fdb && \ - useradd --gid 4059 --uid 4059 --shell /usr/sbin/nologin fdb + useradd --gid 4059 --uid 4059 --shell /usr/sbin/nologin fdb && \ + mkdir -p /var/log/fdb && \ + touch /var/log/fdb/.keep -RUN apt-get update && \ - apt-get install -y --no-install-recommends curl && \ - curl -L https://github.com/krallin/tini/releases/download/v0.19.0/tini-${TARGETARCH} -o tini-${TARGETARCH} && \ - echo "93dcc18adc78c65a028a84799ecf8ad40c936fdfc5f2a57b1acda5a8117fa82c tini-amd64\n07952557df20bfd2a95f9bef198b445e006171969499a1d361bd9e6f8e5e0e81 tini-arm64" > tini-sha.txt && \ - sha256sum --quiet --ignore-missing -c tini-sha.txt && \ - chmod +x tini-${TARGETARCH} && \ - mv tini-${TARGETARCH} /usr/bin/tini && \ - rm -rf /tmp/* +COPY --chown=fdb:fdb --from=builder /workspace/bin/data-loader /usr/local/bin/data-loader # Set to the numeric UID of fdb user to satisfy PodSecurityPolices which enforce runAsNonRoot USER 4059 -ENTRYPOINT [ "/usr/bin/tini", "-g", "--", "python", "/usr/local/bin/app.py" ] +ENTRYPOINT ["/usr/local/bin/data-loader"] diff --git a/sample-apps/data-loader/app.py b/sample-apps/data-loader/app.py deleted file mode 100755 index 7a6323e86..000000000 --- a/sample-apps/data-loader/app.py +++ /dev/null @@ -1,54 +0,0 @@ -#! /usr/bin/python - -""" -This file provides a sample app for loading data into FDB. - -To use it to load data into one of the sample clusters in this repo, -you can build the image by running `docker build -t fdb-data-loader sample-apps/data-loader`, -and then run the data loader by running `kubectl apply -f sample-apps/data-loader/job.yaml` -""" - -import argparse -import random -import uuid -import fdb - -fdb.api_version(600) - - -@fdb.transactional -def write_batch(tr, batch_size, value_size): - prefix = uuid.uuid4() - for index in range(1, batch_size + 1): - tr[fdb.tuple.pack((prefix, index))] = random.randbytes(value_size) - - -def load_data(keys, batch_size, value_size): - batch_count = int(keys / batch_size) - - db = fdb.open() - for batch in range(1, batch_count + 1): - print("Writing batch %d" % batch) - write_batch(db, batch_size, value_size) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Load random data into FDB") - parser.add_argument( - "--keys", type=int, help="Number of keys to generate", default=100000 - ) - parser.add_argument( - "--batch-size", - type=int, - help="Number of keys to write in each transaction", - default=10, - ) - parser.add_argument( - "--value-size", - type=int, - help="Number of bytes to include in each value", - default=1000, - ) - args = parser.parse_args() - - load_data(args.keys, args.batch_size, args.value_size) diff --git a/sample-apps/data-loader/go.mod b/sample-apps/data-loader/go.mod new file mode 100644 index 000000000..abc4d4cfe --- /dev/null +++ b/sample-apps/data-loader/go.mod @@ -0,0 +1,11 @@ +module github.com/FoundationDB/fdb-data-loader + +go 1.24.0 + +toolchain go1.24.4 + +require ( + // Binding version for 7.1.67 + github.com/apple/foundationdb/bindings/go v0.0.0-20250115161953-f1ab8147ed1c + github.com/google/uuid v1.6.0 +) diff --git a/sample-apps/data-loader/go.sum b/sample-apps/data-loader/go.sum new file mode 100644 index 000000000..32c2327c9 --- /dev/null +++ b/sample-apps/data-loader/go.sum @@ -0,0 +1,4 @@ +github.com/apple/foundationdb/bindings/go v0.0.0-20250115161953-f1ab8147ed1c h1:Nnun3T50beIpO6YKDZInHuZMgnNtYJafSlQAvkXwOWc= +github.com/apple/foundationdb/bindings/go v0.0.0-20250115161953-f1ab8147ed1c/go.mod h1:OMVSB21p9+xQUIqlGizHPZfjK+SHws1ht+ZytVDoz9U= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/sample-apps/data-loader/job.yaml b/sample-apps/data-loader/job.yaml index 01e474084..22587c7ec 100644 --- a/sample-apps/data-loader/job.yaml +++ b/sample-apps/data-loader/job.yaml @@ -24,15 +24,20 @@ spec: mountPath: /var/dynamic-conf initContainers: - name: foundationdb-kubernetes-init - image: foundationdb/foundationdb-kubernetes-sidecar:6.3.3-1 + image: foundationdb/fdb-kubernetes-monitor:7.1.57 args: - "--copy-file" - "fdb.cluster" - "--copy-library" - - "6.3" + - "7.1" - "--copy-library" - - "6.2" - - "--init-mode" + - "6.3" + - "--mode" + - "init" + - "--input-dir" + - "/var/dynamic-conf" + - "--output-dir" + - "/var/output-files" - "--require-not-empty" - "fdb.cluster" volumeMounts: diff --git a/sample-apps/data-loader/main.go b/sample-apps/data-loader/main.go new file mode 100644 index 000000000..199a5acf3 --- /dev/null +++ b/sample-apps/data-loader/main.go @@ -0,0 +1,152 @@ +package main + +import ( + "context" + "encoding/binary" + "flag" + "log" + "math/rand/v2" + "os" + "os/signal" + "path" + "strings" + "sync" + "syscall" + "time" + + "github.com/apple/foundationdb/bindings/go/src/fdb" + "github.com/apple/foundationdb/bindings/go/src/fdb/tuple" + "github.com/google/uuid" +) + +// initRandomGenerator setup the random generator to generate the values. +func initRandomGenerator() *rand.ChaCha8 { + bs := make([]byte, 8) + binary.LittleEndian.PutUint64(bs, uint64(time.Now().UnixMilli())) + + seed := [32]byte(make([]byte, 32)) + var idx int + for idx < 32 { + for _, b := range bs { + seed[idx] = b + idx++ + + if idx == 32 { + break + } + } + } + + return rand.NewChaCha8(seed) +} + +func loadData(ctx context.Context, keys int, batchSize int, valueSize int, readValues bool, clusterFile string) { + batchCount := keys / batchSize + + log.Println("opening database with cluster file:", clusterFile) + db, err := fdb.OpenDatabase(clusterFile) + if err != nil { + log.Fatalf("could not open database: %s", err) + } + + randomGen := initRandomGenerator() + for i := 0; i < batchCount; i++ { + select { + case <-ctx.Done(): + // Handle graceful shutdown + log.Printf("Stopping: %v\n", ctx.Err()) + return + default: + log.Println("Writing batch", i) + + prefix := uuid.NewString() + _, err = db.Transact(func(transaction fdb.Transaction) (interface{}, error) { + for idx := 0; idx < batchSize; idx++ { + token := make([]byte, valueSize) + _, randErr := randomGen.Read(token) + if randErr != nil { + return nil, randErr + } + + transaction.Set(tuple.Tuple{prefix, idx}, token) + } + + return nil, nil + }) + + if err != nil { + log.Printf("could not write data: %s\n", err) + } + + if !readValues { + continue + } + + // Wait one second to give the FDB cluster some time to pull the mutations from the log processes. + time.Sleep(1 * time.Second) + + values, err := db.ReadTransact(func(transaction fdb.ReadTransaction) (interface{}, error) { + prefixKey, err := fdb.PrefixRange(tuple.Tuple{prefix}.Pack()) + if err != nil { + return nil, err + } + + return transaction.GetRange(prefixKey, fdb.RangeOptions{}).GetSliceWithError() + }) + + keyValues, ok := values.([]fdb.KeyValue) + if ok { + log.Println("found", len(keyValues), "key values for prefix", prefix) + } + + if err != nil { + log.Printf("could not write data: %s\n", err) + } + } + } +} + +func main() { + fdb.MustAPIVersion(710) + + var keys, batchSize, valueSize int + var readValues bool + var clusterFileDirectory string + flag.IntVar(&keys, "keys", 100000, "Number of keys to generate") + flag.IntVar(&batchSize, "batch-size", 10, "Number of updates per batch") + flag.IntVar(&valueSize, "value-size", 1000, "Number of bytes to include in each value") + flag.BoolVar(&readValues, "read-values", false, "Read the newly written values from the database") + flag.StringVar(&clusterFileDirectory, "cluster-file-directory", "", "The directory that contains the cluster file, if empty the data loader will fallback to the $FDB_CLUSTER_FILE") + flag.Parse() + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + var wg sync.WaitGroup + if clusterFileDirectory == "" { + wg.Add(1) + go func() { + loadData(ctx, keys, batchSize, valueSize, readValues, os.Getenv("FDB_CLUSTER_FILE")) + wg.Done() + }() + } else { + entries, err := os.ReadDir(clusterFileDirectory) + log.Println("reading cluster files from", clusterFileDirectory) + if err != nil { + log.Fatalf("could not read directory %s: %s", clusterFileDirectory, err) + } + for _, entry := range entries { + if !strings.HasSuffix(entry.Name(), ".cluster") { + continue + } + + wg.Add(1) + go func() { + loadData(ctx, keys, batchSize, valueSize, readValues, path.Join(clusterFileDirectory, entry.Name())) + wg.Done() + }() + } + } + + wg.Wait() +}