Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/persistence/cassandra/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func extractCurrentWorkflowConflictError(
binary, _ := conflictRecord["execution_state"].([]byte)
encoding, _ := conflictRecord["execution_state_encoding"].(string)
executionState := &persistencespb.WorkflowExecutionState{}
if state, err := serialization.WorkflowExecutionStateFromBlob(p.NewDataBlob(binary, encoding)); err == nil {
if state, err := serialization.DefaultDecoder.WorkflowExecutionStateFromBlob(p.NewDataBlob(binary, encoding)); err == nil {
executionState = state
}
// if err != nil, this means execution state cannot be parsed, just use default values
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/cassandra/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (s *cassandraErrorsSuite) TestExtractCurrentWorkflowConflictError_Success()
},
},
}
blob, err := serialization.WorkflowExecutionStateToBlob(workflowState)
blob, err := serialization.NewSerializer().WorkflowExecutionStateToBlob(workflowState)
lastWriteVersion := rand.Int63()
s.NoError(err)
t := rowTypeExecution
Expand Down
9 changes: 5 additions & 4 deletions common/persistence/cassandra/execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"go.temporal.io/server/common/persistence/serialization"
)

// Guidelines for creating new special UUID constants
Expand Down Expand Up @@ -84,11 +85,11 @@ type (

var _ p.ExecutionStore = (*ExecutionStore)(nil)

func NewExecutionStore(session gocql.Session) *ExecutionStore {
func NewExecutionStore(session gocql.Session, serializer serialization.Serializer) *ExecutionStore {
return &ExecutionStore{
HistoryStore: NewHistoryStore(session),
MutableStateStore: NewMutableStateStore(session),
MutableStateTaskStore: NewMutableStateTaskStore(session),
HistoryStore: NewHistoryStore(session, serializer),
MutableStateStore: NewMutableStateStore(session, serializer),
MutableStateTaskStore: NewMutableStateTaskStore(session, serializer),
}
}

Expand Down
15 changes: 13 additions & 2 deletions common/persistence/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.temporal.io/server/common/metrics"
p "go.temporal.io/server/common/persistence"
commongocql "go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/resolver"
)

Expand All @@ -21,6 +22,7 @@ type (
clusterName string
logger log.Logger
session commongocql.Session
serializer serialization.Serializer
}
)

Expand All @@ -32,6 +34,7 @@ func NewFactory(
clusterName string,
logger log.Logger,
metricsHandler metrics.Handler,
serializer serialization.Serializer,
) *Factory {
session, err := commongocql.NewSession(
func() (*gocql.ClusterConfig, error) {
Expand All @@ -43,7 +46,13 @@ func NewFactory(
if err != nil {
logger.Fatal("unable to initialize cassandra session", tag.Error(err))
}
return NewFactoryFromSession(cfg, clusterName, logger, session)
return NewFactoryFromSession(
cfg,
clusterName,
logger,
session,
serializer,
)
}

// NewFactoryFromSession returns an instance of a factory object from the given session.
Expand All @@ -52,12 +61,14 @@ func NewFactoryFromSession(
clusterName string,
logger log.Logger,
session commongocql.Session,
serializer serialization.Serializer,
) *Factory {
return &Factory{
cfg: cfg,
clusterName: clusterName,
logger: logger,
session: session,
serializer: serializer,
}
}

Expand Down Expand Up @@ -88,7 +99,7 @@ func (f *Factory) NewClusterMetadataStore() (p.ClusterMetadataStore, error) {

// NewExecutionStore returns a new ExecutionStore.
func (f *Factory) NewExecutionStore() (p.ExecutionStore, error) {
return NewExecutionStore(f.session), nil
return NewExecutionStore(f.session, f.serializer), nil
}

// NewQueue returns a new queue backed by cassandra
Expand Down
19 changes: 14 additions & 5 deletions common/persistence/cassandra/history_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.temporal.io/api/serviceerror"
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/primitives"
)

Expand Down Expand Up @@ -44,13 +45,17 @@ const (
type (
HistoryStore struct {
Session gocql.Session
p.HistoryBranchUtilImpl
p.HistoryBranchUtil
}
)

func NewHistoryStore(session gocql.Session) *HistoryStore {
func NewHistoryStore(
session gocql.Session,
serializer serialization.Serializer,
) *HistoryStore {
return &HistoryStore{
Session: session,
Session: session,
HistoryBranchUtil: p.NewHistoryBranchUtil(serializer),
}
}

Expand Down Expand Up @@ -137,7 +142,7 @@ func (h *HistoryStore) ReadHistoryBranch(
ctx context.Context,
request *p.InternalReadHistoryBranchRequest,
) (*p.InternalReadHistoryBranchResponse, error) {
branch, err := h.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
branch, err := h.ParseHistoryBranchInfo(request.BranchToken)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -346,7 +351,7 @@ func (h *HistoryStore) GetHistoryTreeContainingBranch(
request *p.InternalGetHistoryTreeContainingBranchRequest,
) (*p.InternalGetHistoryTreeContainingBranchResponse, error) {

branch, err := h.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
branch, err := h.ParseHistoryBranchInfo(request.BranchToken)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -391,6 +396,10 @@ func (h *HistoryStore) GetHistoryTreeContainingBranch(
}, nil
}

func (h *HistoryStore) GetHistoryBranchUtil() p.HistoryBranchUtil {
return h.HistoryBranchUtil
}

func convertHistoryNode(
message map[string]interface{},
) p.InternalHistoryNode {
Expand Down
12 changes: 7 additions & 5 deletions common/persistence/cassandra/mutable_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,15 @@ const (

type (
MutableStateStore struct {
Session gocql.Session
Session gocql.Session
serializer serialization.Serializer
}
)

func NewMutableStateStore(session gocql.Session) *MutableStateStore {
func NewMutableStateStore(session gocql.Session, serializer serialization.Serializer) *MutableStateStore {
return &MutableStateStore{
Session: session,
Session: session,
serializer: serializer,
}
}

Expand Down Expand Up @@ -648,7 +650,7 @@ func (d *MutableStateStore) UpdateWorkflowExecution(
lastWriteVersion := updateWorkflow.LastWriteVersion

// TODO: double encoding execution state? already in updateWorkflow.ExecutionStateBlob
executionStateDatablob, err := serialization.WorkflowExecutionStateToBlob(updateWorkflow.ExecutionState)
executionStateDatablob, err := d.serializer.WorkflowExecutionStateToBlob(updateWorkflow.ExecutionState)
if err != nil {
return err
}
Expand Down Expand Up @@ -962,7 +964,7 @@ func (d *MutableStateStore) GetCurrentExecution(
}

// TODO: fix blob ExecutionState in storage should not be a blob.
executionState, err := serialization.WorkflowExecutionStateFromBlob(executionStateBlob)
executionState, err := d.serializer.WorkflowExecutionStateFromBlob(executionStateBlob)
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions common/persistence/cassandra/mutable_state_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,15 @@ const (

type (
MutableStateTaskStore struct {
Session gocql.Session
Session gocql.Session
serializer serialization.Serializer
}
)

func NewMutableStateTaskStore(session gocql.Session) *MutableStateTaskStore {
func NewMutableStateTaskStore(session gocql.Session, serializer serialization.Serializer) *MutableStateTaskStore {
return &MutableStateTaskStore{
Session: session,
Session: session,
serializer: serializer,
}
}

Expand Down Expand Up @@ -503,7 +505,7 @@ func (d *MutableStateTaskStore) PutReplicationTaskToDLQ(
request *p.PutReplicationTaskToDLQRequest,
) error {
task := request.TaskInfo
datablob, err := serialization.ReplicationTaskInfoToBlob(task)
datablob, err := d.serializer.ReplicationTaskInfoToBlob(task)
if err != nil {
return gocql.ConvertError("PutReplicationTaskToDLQ", err)
}
Expand Down
21 changes: 12 additions & 9 deletions common/persistence/cassandra/queue_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ const (

type (
QueueStore struct {
queueType persistence.QueueType
session gocql.Session
logger log.Logger
queueType persistence.QueueType
session gocql.Session
logger log.Logger
serializer serialization.Serializer
}
)

Expand All @@ -42,9 +43,10 @@ func NewQueueStore(
logger log.Logger,
) (persistence.Queue, error) {
return &QueueStore{
queueType: queueType,
session: session,
logger: logger,
queueType: queueType,
session: session,
logger: logger,
serializer: serialization.NewSerializer(),
}, nil
}

Expand Down Expand Up @@ -300,7 +302,7 @@ func (q *QueueStore) getQueueMetadata(
return nil, err
}

return convertQueueMetadata(message)
return convertQueueMetadata(message, q.serializer)
}

func (q *QueueStore) updateAckLevel(
Expand All @@ -310,7 +312,7 @@ func (q *QueueStore) updateAckLevel(
) error {

// TODO: remove this once cluster_ack_level is removed from DB
metadataStruct, err := serialization.QueueMetadataFromBlob(metadata.Blob)
metadataStruct, err := q.serializer.QueueMetadataFromBlob(metadata.Blob)
if err != nil {
return gocql.ConvertError("updateAckLevel", err)
}
Expand Down Expand Up @@ -385,6 +387,7 @@ func convertQueueMessage(

func convertQueueMetadata(
message map[string]interface{},
serializer serialization.Serializer,
) (*persistence.InternalQueueMetadata, error) {

metadata := &persistence.InternalQueueMetadata{
Expand All @@ -394,7 +397,7 @@ func convertQueueMetadata(
if ok {
clusterAckLevel := message["cluster_ack_level"].(map[string]int64)
// TODO: remove this once we remove cluster_ack_level from DB.
blob, err := serialization.QueueMetadataToBlob(&persistencespb.QueueMetadata{ClusterAckLevels: clusterAckLevel})
blob, err := serializer.QueueMetadataToBlob(&persistencespb.QueueMetadata{ClusterAckLevels: clusterAckLevel})
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (f *factoryImpl) NewExecutionManager() (persistence.ExecutionManager, error
return nil, err
}

result := persistence.NewExecutionManager(store, f.serializer, f.eventBlobCache, f.logger, f.config.TransactionSizeLimit)
result := persistence.NewExecutionManager(store, f.serializer, serialization.NewTaskSerializer(f.serializer), f.eventBlobCache, f.logger, f.config.TransactionSizeLimit)
if f.systemRateLimiter != nil && f.namespaceRateLimiter != nil {
result = persistence.NewExecutionPersistenceRateLimitedClient(result, f.systemRateLimiter, f.namespaceRateLimiter, f.shardRateLimiter, f.logger)
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func (f *factoryImpl) NewHistoryTaskQueueManager() (persistence.HistoryTaskQueue
if err != nil {
return nil, err
}
return persistence.NewHistoryTaskQueueManager(q, serialization.NewSerializer()), nil
return persistence.NewHistoryTaskQueueManager(q, f.serializer), nil
}

func (f *factoryImpl) NewNexusEndpointManager() (persistence.NexusEndpointManager, error) {
Expand Down
9 changes: 6 additions & 3 deletions common/persistence/client/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type (
Logger log.Logger
HealthSignals persistence.HealthSignalAggregator
DynamicRateLimitingParams DynamicRateLimitingParams
Serializer serialization.Serializer
}

FactoryProviderFn func(NewFactoryParams) Factory
Expand Down Expand Up @@ -83,6 +84,7 @@ func ClusterNameProvider(config *cluster.Config) ClusterName {
func EventBlobCacheProvider(
dc *dynamicconfig.Collection,
logger log.Logger,
serializer serialization.Serializer,
) persistence.XDCCache {
return persistence.NewEventsBlobCache(
dynamicconfig.XDCCacheMaxSizeBytes.Get(dc)(),
Expand Down Expand Up @@ -128,7 +130,7 @@ func FactoryProvider(
systemRequestRateLimiter,
namespaceRequestRateLimiter,
shardRequestRateLimiter,
serialization.NewSerializer(),
params.Serializer,
params.EventBlobCache,
string(params.ClusterName),
params.MetricsHandler,
Expand Down Expand Up @@ -163,14 +165,15 @@ func DataStoreFactoryProvider(
logger log.Logger,
metricsHandler metrics.Handler,
tracerProvider trace.TracerProvider,
serializer serialization.Serializer,
) persistence.DataStoreFactory {
var dataStoreFactory persistence.DataStoreFactory
defaultStoreCfg := cfg.DataStores[cfg.DefaultStore]
switch {
case defaultStoreCfg.Cassandra != nil:
dataStoreFactory = cassandra.NewFactory(*defaultStoreCfg.Cassandra, r, string(clusterName), logger, metricsHandler)
dataStoreFactory = cassandra.NewFactory(*defaultStoreCfg.Cassandra, r, string(clusterName), logger, metricsHandler, serializer)
case defaultStoreCfg.SQL != nil:
dataStoreFactory = sql.NewFactory(*defaultStoreCfg.SQL, r, string(clusterName), logger, metricsHandler)
dataStoreFactory = sql.NewFactory(*defaultStoreCfg.SQL, r, string(clusterName), serializer, logger, metricsHandler)
case defaultStoreCfg.CustomDataStoreConfig != nil:
dataStoreFactory = abstractDataStoreFactory.NewFactory(*defaultStoreCfg.CustomDataStoreConfig, r, string(clusterName), logger, metricsHandler)
default:
Expand Down
5 changes: 3 additions & 2 deletions common/persistence/data_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,8 +1231,9 @@ type (
}

HistoryTaskQueueManagerImpl struct {
queue QueueV2
serializer serialization.Serializer
queue QueueV2
serializer serialization.Serializer
taskSerializer serialization.TaskSerializer
}

// QueueKey identifies a history task queue. It is converted to a queue name using the GetQueueName method.
Expand Down
Loading
Loading