diff --git a/.gitignore b/.gitignore index d87d4be..feca1db 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +spec/database.yml *.gem *.rbc .bundle diff --git a/lib/sequel/adapters/redshift.rb b/lib/sequel/adapters/redshift.rb index b693fe3..92694f1 100644 --- a/lib/sequel/adapters/redshift.rb +++ b/lib/sequel/adapters/redshift.rb @@ -1,31 +1,118 @@ require 'sequel/adapters/postgres' +require 'sequel/adapters/shared/redshift' module Sequel module Redshift include Postgres class Database < Postgres::Database + include Sequel::Redshift::DatabaseMethods + set_adapter_scheme :redshift + DIST_KEY = ' DISTKEY'.freeze + SORT_KEY = ' SORTKEY'.freeze + + # The order of column modifiers to use when defining a column. + COLUMN_DEFINITION_ORDER = [:collate, :default, :primary_key, :dist_key, :sort_key, :null, :unique, :auto_increment, :references] + + def dataset_class_default + Sequel::Redshift::Dataset + end + + # We need to change these default settings because they correspond to + # Postgres configuration variables which do not exist in Redshift + def adapter_initialize + @opts.merge!( + force_standard_strings: false, + client_min_messages: false + ) + super + end + + def table_exists?(name) + sql = <<~SQL + SELECT EXISTS ( + SELECT * FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'public' + AND c.relname = '#{name}' + AND c.relkind = 'r' + ); + SQL + fetch(sql).first.fetch(:"?column?") + end + def column_definition_primary_key_sql(sql, column) - result = super - result << ' IDENTITY' if result - result + if column[:primary_key] && column[:type] == Integer + sql << ' IDENTITY' + end + end + + # Add DISTKEY SQL fragment to column creation SQL. + def column_definition_dist_key_sql(sql, column) + if column[:dist_key] + sql << DIST_KEY + end + end + + # Add SORTKEY SQL fragment to column creation SQL. + def column_definition_sort_key_sql(sql, column) + if column[:sort_key] + sql << SORT_KEY + end end def serial_primary_key_options # redshift doesn't support serial type super.merge(serial: false) end + + # DROP TABLE IF EXISTS is now supported by Redshift + def supports_drop_table_if_exists? + true + end + + # None of the alter table operation are combinable. + def combinable_alter_table_op?(op) + false + end + + def supports_transactional_ddl? + false + end + + def supports_savepoints? + false + end end class Dataset < Postgres::Dataset Database::DatasetClass = self - # Redshift doesn't support RETURNING statement - def insert_returning_sql(sql) - # do nothing here - sql + Dataset.def_sql_method(self, :select, [['if opts[:values]', %w'values order limit'], ['elsif server_version >= 80400', %w'with select distinct columns from join where group having window compounds order limit lock'], ['else', %w'with select distinct columns from join where group having compounds order limit lock']]) + + def initialize(db) + @db = db + @opts = {disable_insert_returning: true}.freeze + @cache = {} + freeze + end + + def supports_cte?(type = :select) + true if type == :select + end + + def supports_insert_select? + false + end + + def supports_returning?(type) + false + end + + def supports_window_functions? + true end end end diff --git a/lib/sequel/adapters/shared/redshift.rb b/lib/sequel/adapters/shared/redshift.rb new file mode 100644 index 0000000..4bc40ff --- /dev/null +++ b/lib/sequel/adapters/shared/redshift.rb @@ -0,0 +1,78 @@ +module Sequel + module Redshift + module DatabaseMethods + + REDSHIFT_STRING_MAXIMUM_SIZE = 255 + + # Redshift does not support arrays (type of pg_index.indkey is INT2VECTOR), + # and because of that we can't determine the primary key - so we set it to false. + # + # The workaround for now is to use `set_primary_key` inside the sequel model. + def schema_parse_table(table_name, opts) + m = output_identifier_meth(opts[:dataset]) + + ds = metadata_dataset.select(:pg_attribute__attname___name, + SQL::Cast.new(:pg_attribute__atttypid, :integer).as(:oid), + SQL::Cast.new(:basetype__oid, :integer).as(:base_oid), + SQL::Function.new(:format_type, :basetype__oid, :pg_type__typtypmod).as(:db_base_type), + SQL::Function.new(:format_type, :pg_type__oid, :pg_attribute__atttypmod).as(:db_type), + SQL::Function.new(:pg_get_expr, :pg_attrdef__adbin, :pg_class__oid).as(:default), + SQL::BooleanExpression.new(:NOT, :pg_attribute__attnotnull).as(:allow_null)). + from(:pg_class). + join(:pg_attribute, :attrelid=>:oid). + join(:pg_type, :oid=>:atttypid). + left_outer_join(:pg_type___basetype, :oid=>:typbasetype). + left_outer_join(:pg_attrdef, :adrelid=>:pg_class__oid, :adnum=>:pg_attribute__attnum). + filter(:pg_attribute__attisdropped=>false). + filter{|o| o.pg_attribute__attnum > 0}. + filter(:pg_class__oid=>regclass_oid(table_name, opts)). + order(:pg_attribute__attnum) + + ds.map do |row| + row[:default] = nil if blank_object?(row[:default]) + if row[:base_oid] + row[:domain_oid] = row[:oid] + row[:oid] = row.delete(:base_oid) + row[:db_domain_type] = row[:db_type] + row[:db_type] = row.delete(:db_base_type) + else + row.delete(:base_oid) + row.delete(:db_base_type) + end + row[:type] = schema_column_type(row[:db_type]) + row[:primary_key] = false + [m.call(row.delete(:name)), row] + end + end + + # Redshift changes text to varchar with maximum size of 256, and it complains if you will give text column + def type_literal_generic_string(column) + "#{column[:fixed] ? 'CHAR' : 'VARCHAR'}(#{column[:size] || REDSHIFT_STRING_MAXIMUM_SIZE})" + end + + # The version of the PostgreSQL server, used for determining capability. + def server_version(server=nil) + @server_version ||= 80002 + end + + def create_table_suffix_sql(name, options) + sql = create_table_attributes(name, options) + "#{super}#{sql}" + end + + def create_table_attributes(name, options) + sql = String.new + sql << " DISTSTYLE #{options[:diststyle].upcase}" if options[:diststyle] + sql << " DISTKEY(#{options[:distkey]})" if options[:distkey] + sql << " SORTKEY(#{Array(options[:sortkey]).join(', ')})" if options[:sortkey] + sql + end + + def create_table_as_sql(name, sql, options) + result = create_table_prefix_sql(name, options) + result += create_table_attributes(name, options) + result += " AS #{sql}" + end + end + end +end diff --git a/lib/sequel/extensions/median.rb b/lib/sequel/extensions/median.rb new file mode 100644 index 0000000..700586e --- /dev/null +++ b/lib/sequel/extensions/median.rb @@ -0,0 +1,55 @@ +# frozen-string-literal: true +# +module Sequel + module SQL + module Builders + # Return a median expression. + def median(*a) + Median.new(*a) + end + end + + # The Median class represents a median function for Redshift and Postgresql databases. + class Median < GenericExpression + include OrderMethods + + module DatasetMethods + # Append the SQL fragment for the Median expression to the SQL query. + def median_sql_append(sql, sa) + if defined?(super) + return super + end + + expr = sa.expr + + case db_type = db.adapter_scheme + when :postgres + + literal_append(sql, Function.new(:percentile_disc, 0.5)) + sql << " WITHIN GROUP (ORDER BY " + identifier_append(sql, expr) + sql << ")" + + when :redshift + + literal_append(sql, Function.new(:median, expr)) + + else + raise Error, "median is not implemented on #{db.database_type}" + end + end + end + + attr_reader :expr + + # Set the expression and separator + def initialize(expr) + @expr = expr + end + + to_s_method :median_sql + end + end + + Dataset.register_extension(:median, SQL::Median::DatasetMethods) +end diff --git a/lib/sequel/extensions/pg_array.rb b/lib/sequel/extensions/pg_array.rb new file mode 100644 index 0000000..a25da1a --- /dev/null +++ b/lib/sequel/extensions/pg_array.rb @@ -0,0 +1,590 @@ +# frozen-string-literal: true +# +# This is patched version of original plugin +# @see https://github.com/jeremyevans/sequel/blob/5.71.0/lib/sequel/extensions/pg_array.rb +# + +require 'delegate' + +module Sequel + module Postgres + # Represents a PostgreSQL array column value. + class PGArray < DelegateClass(Array) + include Sequel::SQL::AliasMethods + + module DatabaseMethods + BLOB_RANGE = 1...-1 + + # Create the local hash of database type strings to schema type symbols, + # used for array types local to this database. + def self.extended(db) + db.instance_exec do + @pg_array_schema_types ||= {} + register_array_type('timestamp without time zone', :oid=>1115, :scalar_oid=>1114, :type_symbol=>:datetime) + register_array_type('timestamp with time zone', :oid=>1185, :scalar_oid=>1184, :type_symbol=>:datetime_timezone, :scalar_typecast=>:datetime) + + register_array_type('text', :oid=>1009, :scalar_oid=>25, :type_symbol=>:string) + register_array_type('integer', :oid=>1007, :scalar_oid=>23) + register_array_type('bigint', :oid=>1016, :scalar_oid=>20, :scalar_typecast=>:integer) + register_array_type('numeric', :oid=>1231, :scalar_oid=>1700, :type_symbol=>:decimal) + register_array_type('double precision', :oid=>1022, :scalar_oid=>701, :type_symbol=>:float) + + register_array_type('boolean', :oid=>1000, :scalar_oid=>16) + register_array_type('bytea', :oid=>1001, :scalar_oid=>17, :type_symbol=>:blob) + register_array_type('date', :oid=>1182, :scalar_oid=>1082) + register_array_type('time without time zone', :oid=>1183, :scalar_oid=>1083, :type_symbol=>:time) + register_array_type('time with time zone', :oid=>1270, :scalar_oid=>1266, :type_symbol=>:time_timezone, :scalar_typecast=>:time) + + register_array_type('smallint', :oid=>1005, :scalar_oid=>21, :scalar_typecast=>:integer) + register_array_type('oid', :oid=>1028, :scalar_oid=>26, :scalar_typecast=>:integer) + register_array_type('real', :oid=>1021, :scalar_oid=>700, :scalar_typecast=>:float) + register_array_type('character', :oid=>1014, :converter=>nil, :array_type=>:text, :scalar_typecast=>:string) + register_array_type('character varying', :oid=>1015, :converter=>nil, :scalar_typecast=>:string, :type_symbol=>:varchar) + + register_array_type('xml', :oid=>143, :scalar_oid=>142) + register_array_type('money', :oid=>791, :scalar_oid=>790) + register_array_type('bit', :oid=>1561, :scalar_oid=>1560) + register_array_type('bit varying', :oid=>1563, :scalar_oid=>1562, :type_symbol=>:varbit) + register_array_type('uuid', :oid=>2951, :scalar_oid=>2950) + + register_array_type('xid', :oid=>1011, :scalar_oid=>28) + register_array_type('cid', :oid=>1012, :scalar_oid=>29) + + register_array_type('name', :oid=>1003, :scalar_oid=>19) + register_array_type('tid', :oid=>1010, :scalar_oid=>27) + register_array_type('int2vector', :oid=>1006, :scalar_oid=>22) + register_array_type('oidvector', :oid=>1013, :scalar_oid=>30) + + register_array_type('super', :oid=>4000, :scalar_oid=>25, :type_symbol=>:string, :redshift_only=>true) + + [:string_array, :integer_array, :decimal_array, :float_array, :boolean_array, :blob_array, :date_array, :time_array, :datetime_array].each do |v| + @schema_type_classes[v] = PGArray + end + end + end + + def add_named_conversion_proc(name, &block) + ret = super + name = name.to_s if name.is_a?(Symbol) + from(:pg_type).where(:typname=>name).select_map([:oid, :typarray]).each do |scalar_oid, array_oid| + register_array_type(name, :oid=>array_oid.to_i, :scalar_oid=>scalar_oid.to_i) + end + ret + end + + # Handle arrays in bound variables + def bound_variable_arg(arg, conn) + case arg + when PGArray + bound_variable_array(arg.to_a) + when Array + bound_variable_array(arg) + else + super + end + end + + # Freeze the pg array schema types to prevent adding new ones. + def freeze + @pg_array_schema_types.freeze + super + end + + # Register a database specific array type. Options: + # + # :array_type :: The type to automatically cast the array to when literalizing the array. + # Usually the same as db_type. + # :converter :: A callable object (e.g. Proc), that is called with each element of the array + # (usually a string), and should return the appropriate typecasted object. + # :oid :: The PostgreSQL OID for the array type. This is used by the Sequel postgres adapter + # to set up automatic type conversion on retrieval from the database. + # :scalar_oid :: Should be the PostgreSQL OID for the scalar version of this array type. If given, + # automatically sets the :converter option by looking for scalar conversion + # proc. + # :scalar_typecast :: Should be a symbol indicating the typecast method that should be called on + # each element of the array, when a plain array is passed into a database + # typecast method. For example, for an array of integers, this could be set to + # :integer, so that the typecast_value_integer method is called on all of the + # array elements. Defaults to :type_symbol option. + # :type_symbol :: The base of the schema type symbol for this type. For example, if you provide + # :integer, Sequel will recognize this type as :integer_array during schema parsing. + # Defaults to the db_type argument. + # + # If a block is given, it is treated as the :converter option. + def register_array_type(db_type, opts=OPTS, &block) + oid = opts[:oid] + soid = opts[:scalar_oid] + + if has_converter = opts.has_key?(:converter) + raise Error, "can't provide both a block and :converter option to register_array_type" if block + converter = opts[:converter] + else + has_converter = true if block + converter = block + end + + unless (soid || has_converter) && oid + array_oid, scalar_oid = from(:pg_type).where(:typname=>db_type.to_s).get([:typarray, :oid]) + soid ||= scalar_oid unless has_converter + oid ||= array_oid + end + + db_type = db_type.to_s + type = (opts[:type_symbol] || db_type).to_sym + typecast_method_map = @pg_array_schema_types + + if soid + raise Error, "can't provide both a converter and :scalar_oid option to register" if has_converter + converter = conversion_procs[soid] + end + + array_type = (opts[:array_type] || db_type).to_s.dup.freeze + creator = Creator.new(array_type, converter, opts[:redshift_only] == true) + add_conversion_proc(oid, creator) + + typecast_method_map[db_type] = :"#{type}_array" + + singleton_class.class_eval do + meth = :"typecast_value_#{type}_array" + scalar_typecast_method = :"typecast_value_#{opts.fetch(:scalar_typecast, type)}" + define_method(meth){|v| typecast_value_pg_array(v, creator, scalar_typecast_method)} + private meth + alias_method(meth, meth) + end + + @schema_type_classes[:"#{type}_array"] = PGArray + nil + end + + private + + # Format arrays used in bound variables. + def bound_variable_array(a) + case a + when Array + "{#{a.map{|i| bound_variable_array(i)}.join(',')}}" + when Sequel::SQL::Blob + bound_variable_array_string(literal(a)[BLOB_RANGE].gsub("''", "'")) + when Sequel::LiteralString + a + when String + bound_variable_array_string(a) + when Float + if a.infinite? + a > 0 ? '"Infinity"' : '"-Infinity"' + elsif a.nan? + '"NaN"' + else + literal(a) + end + else + if (s = bound_variable_arg(a, nil)).is_a?(String) + bound_variable_array_string(s) + else + literal(a) + end + end + end + + # Escape strings used as array members in bound variables. Most complex + # will create a regular string with bound_variable_arg, and then use this + # escaping to format it as an array member. + def bound_variable_array_string(s) + "\"#{s.gsub(/("|\\)/, '\\\\\1')}\"" + end + + # Look into both the current database's array schema types and the global + # array schema types to get the type symbol for the given database type + # string. + def pg_array_schema_type(type) + @pg_array_schema_types[type] + end + + # Make the column type detection handle registered array types. + def schema_array_type(db_type) + if (db_type =~ /\A([^(]+)(?:\([^(]+\))?\[\]\z/io) && (type = pg_array_schema_type($1)) + type + else + super + end + end + + # Set the :callable_default value if the default value is recognized as an empty array. + def schema_post_process(_) + super.each do |a| + h = a[1] + if h[:default] =~ /\A(?:'\{\}'|ARRAY\[\])::([\w ]+)\[\]\z/ + type = $1.freeze + h[:callable_default] = lambda{Sequel.pg_array([], type)} + end + end + end + + # Convert ruby arrays to PostgreSQL arrays when used as default values. + def column_definition_default_sql(sql, column) + if (d = column[:default]) && d.is_a?(Array) && !Sequel.condition_specifier?(d) + if db_type == :redshift + sql << " DEFAULT (#{literal(Sequel.pg_array(d))}" + else + sql << " DEFAULT (#{literal(Sequel.pg_array(d))}::#{type_literal(column)})" + end + else + super + end + end + + # Given a value to typecast and the type of PGArray subclass: + # * If given a PGArray with a matching array_type, use it directly. + # * If given a PGArray with a different array_type, return a PGArray + # with the creator's type. + # * If given an Array, create a new PGArray instance for it. This does not + # typecast all members of the array in ruby for performance reasons, but + # it will cast the array the appropriate database type when the array is + # literalized. + def typecast_value_pg_array(value, creator, scalar_typecast_method=nil) + case value + when PGArray + if value.array_type != creator.type + PGArray.new(value.to_a, creator.type) + else + value + end + when Array + if scalar_typecast_method && respond_to?(scalar_typecast_method, true) + value = Sequel.recursive_map(value, method(scalar_typecast_method)) + end + PGArray.new(value, creator.type) + else + raise Sequel::InvalidValue, "invalid value for array type: #{value.inspect}" + end + end + end + + unless Sequel::Postgres.respond_to?(:parse_pg_array) + require 'strscan' + + # PostgreSQL array parser that handles PostgreSQL array output format. + # Note that does not handle all forms out input that PostgreSQL will + # accept, and it will not raise an error for all forms of invalid input. + class Parser < StringScanner + # Set the source for the input, and any converter callable + # to call with objects to be created. For nested parsers + # the source may contain text after the end current parse, + # which will be ignored. + def initialize(source, converter=nil) + super(source) + @converter = converter + @stack = [[]] + @encoding = string.encoding + @recorded = String.new.force_encoding(@encoding) + end + + # Take the buffer of recorded characters and add it to the array + # of entries, and use a new buffer for recorded characters. + def new_entry(include_empty=false) + if !@recorded.empty? || include_empty + entry = @recorded + if entry == 'NULL' && !include_empty + entry = nil + elsif @converter + entry = @converter.call(entry) + end + @stack.last.push(entry) + @recorded = String.new.force_encoding(@encoding) + end + end + + # Parse the input character by character, returning an array + # of parsed (and potentially converted) objects. + def parse + raise Sequel::Error, "invalid array, empty string" if eos? + raise Sequel::Error, "invalid array, doesn't start with {" unless scan(/((\[\d+:\d+\])+=)?\{/) + + # :nocov: + while !eos? + # :nocov: + char = scan(/[{}",]|[^{}",]+/) + if char == ',' + # Comma outside quoted string indicates end of current entry + new_entry + elsif char == '"' + raise Sequel::Error, "invalid array, opening quote with existing recorded data" unless @recorded.empty? + # :nocov: + while true + # :nocov: + char = scan(/["\\]|[^"\\]+/) + if char == '\\' + @recorded << getch + elsif char == '"' + n = peek(1) + raise Sequel::Error, "invalid array, closing quote not followed by comma or closing brace" unless n == ',' || n == '}' + break + else + @recorded << char + end + end + new_entry(true) + elsif char == '{' + raise Sequel::Error, "invalid array, opening brace with existing recorded data" unless @recorded.empty? + + # Start of new array, add it to the stack + new = [] + @stack.last << new + @stack << new + elsif char == '}' + # End of current array, add current entry to the current array + new_entry + + if @stack.length == 1 + raise Sequel::Error, "array parsing finished without parsing entire string" unless eos? + + # Top level of array, parsing should be over. + # Pop current array off stack and return it as result + return @stack.pop + else + # Nested array, pop current array off stack + @stack.pop + end + else + # Add the character to the recorded character buffer. + @recorded << char + end + end + + raise Sequel::Error, "array parsing finished with array unclosed" + end + end + end + + class RsParser < Parser + + def parse + raise Sequel::Error, "invalid array, empty string" if eos? + raise Sequel::Error, "invalid array, doesn't start with [" unless scan(/((\[\d+:\d+\])+=)?\[/) + + # :nocov: + while !eos? + # :nocov: + char = scan(/[\[\]",]|[^\[\]",]+/) + if char == ',' + # Comma outside quoted string indicates end of current entry + new_entry + elsif char == '"' + raise Sequel::Error, "invalid array, opening quote with existing recorded data" unless @recorded.empty? + # :nocov: + while true + # :nocov: + char = scan(/["\\]|[^"\\]+/) + if char == '\\' + @recorded << getch + elsif char == '"' + n = peek(1) + raise Sequel::Error, "invalid array, closing quote not followed by comma or closing brace" unless n == ',' || n == ']' + break + else + @recorded << char + end + end + new_entry(true) + elsif char == '[' + raise Sequel::Error, "invalid array, opening brace with existing recorded data" unless @recorded.empty? + + # Start of new array, add it to the stack + new = [] + @stack.last << new + @stack << new + elsif char == ']' + # End of current array, add current entry to the current array + new_entry + + if @stack.length == 1 + raise Sequel::Error, "array parsing finished without parsing entire string" unless eos? + + # Top level of array, parsing should be over. + # Pop current array off stack and return it as result + return @stack.pop + else + # Nested array, pop current array off stack + @stack.pop + end + else + # Add the character to the recorded character buffer. + @recorded << char + end + end + + raise Sequel::Error, "array parsing finished with array unclosed" + end + end + + # Callable object that takes the input string and parses it using Parser. + class Creator + # The converter callable that is called on each member of the array + # to convert it to the correct type. + attr_reader :converter + + # The database type to set on the PGArray instances returned. + attr_reader :type + + # Set the type and optional converter callable that will be used. + def initialize(type, converter=nil, redshift_only=false) + @type = type + @converter = converter + @redshift_only = redshift_only + end + + if Sequel::Postgres.respond_to?(:parse_pg_array) + # :nocov: + # Use sequel_pg's C-based parser if it has already been defined. + def call(string) + PGArray.new(Sequel::Postgres.parse_pg_array(string, @converter), @type) + end + # :nocov: + else + # Parse the string using Parser with the appropriate + # converter, and return a PGArray with the appropriate database + # type. + def call(string) + if @redshift_only + PGArray.new(RsParser.new(string, @converter).parse, @type) + else + PGArray.new(Parser.new(string, @converter).parse, @type) + end + end + end + end + + # The type of this array. May be nil if no type was given. If a type + # is provided, the array is automatically casted to this type when + # literalizing. This type is the underlying type, not the array type + # itself, so for an int4[] database type, it should be :int4 or 'int4' + attr_accessor :array_type + + # Set the array to delegate to, and a database type. + def initialize(array, type=nil) + super(array) + @array_type = type + end + + # Append the array SQL to the given sql string. + # If the receiver has a type, add a cast to the + # database array type. + def sql_literal_append(ds, sql) + return rs_sql_literal_append(ds, sql) if ds.db.redshift? + + at = array_type + if empty? && at + sql << "'{}'" + else + sql << "ARRAY" + _literal_append(sql, ds, to_a) + end + if at + sql << '::' << at.to_s << '[]' + end + end + + # Redshift version + def rs_sql_literal_append(ds, sql) + at = array_type + if empty? && at + sql << "'{}'" + else + sql << "ARRAY" + _rs_literal_append(sql, ds, to_a) + end + end + + # Allow automatic parameterization of the receiver if all elements can be + # can be automatically parameterized. + def sequel_auto_param_type(ds) + if array_type && all?{|x| nil == x || ds.send(:auto_param_type, x)} && db_type != :redshift + "::#{array_type}[]" + end + end + + private + + # Recursive method that handles multi-dimensional + # arrays, surrounding each with [] and interspersing + # entries with ,. + def _literal_append(sql, ds, array) + sql << '[' + comma = false + commas = ',' + array.each do |i| + sql << commas if comma + if i.is_a?(Array) + _literal_append(sql, ds, i) + else + ds.literal_append(sql, i) + end + comma = true + end + sql << ']' + end + + # Redshift version + def _rs_literal_append(sql, ds, array) + sql << '(' + comma = false + commas = ',' + array.each do |i| + sql << commas if comma + if i.is_a?(Array) + _rs_literal_append(sql, ds, i) + else + ds.literal_append(sql, i) + end + comma = true + end + sql << ')' + end + end + end + + module SQL::Builders + # Return a Postgres::PGArray proxy for the given array and database array type. + def pg_array(v, array_type=nil) + case v + when Postgres::PGArray + if array_type.nil? || v.array_type == array_type + v + else + Postgres::PGArray.new(v.to_a, array_type) + end + when Array + Postgres::PGArray.new(v, array_type) + else + # May not be defined unless the pg_array_ops extension is used + pg_array_op(v) + end + end + end + + Database.register_extension(:pg_array, Postgres::PGArray::DatabaseMethods) +end + +# :nocov: +if Sequel.core_extensions? + class Array + # Return a PGArray proxy to the receiver, using a + # specific database type if given. This is mostly useful + # as a short cut for creating PGArray objects that didn't + # come from the database. + def pg_array(type=nil) + Sequel::Postgres::PGArray.new(self, type) + end + end +end + +if defined?(Sequel::CoreRefinements) + module Sequel::CoreRefinements + refine Array do + def pg_array(type=nil) + Sequel::Postgres::PGArray.new(self, type) + end + end + end +end +# :nocov: \ No newline at end of file diff --git a/lib/sequel/extensions/pg_array_ops.rb b/lib/sequel/extensions/pg_array_ops.rb new file mode 100644 index 0000000..571c9da --- /dev/null +++ b/lib/sequel/extensions/pg_array_ops.rb @@ -0,0 +1,284 @@ +# frozen-string-literal: true +# +# This is patched version of original plugin +# @see https://github.com/jeremyevans/sequel/blob/5.71.0/lib/sequel/extensions/pg_array_ops.rb +# + +module Sequel + module Postgres + # The ArrayOp class is a simple container for a single object that + # defines methods that yield Sequel expression objects representing + # PostgreSQL array operators and functions. + # + # In the method documentation examples, assume that: + # + # array_op = :array.pg_array + class ArrayOp < Sequel::SQL::Wrapper + CONCAT = ["(".freeze, " || ".freeze, ")".freeze].freeze + CONTAINS = ["(".freeze, " @> ".freeze, ")".freeze].freeze + CONTAINED_BY = ["(".freeze, " <@ ".freeze, ")".freeze].freeze + OVERLAPS = ["(".freeze, " && ".freeze, ")".freeze].freeze + + # Access a member of the array, returns an SQL::Subscript instance: + # + # array_op[1] # array[1] + def [](key) + s = Sequel::SQL::Subscript.new(self, [key]) + s = ArrayOp.new(s) if key.is_a?(Range) + s + end + + # Call the ALL function: + # + # array_op.all # ALL(array) + # + # Usually used like: + # + # dataset.where(1=>array_op.all) + # # WHERE (1 = ALL(array)) + def all + function(:ALL) + end + + # Call the ANY function: + # + # array_op.any # ANY(array) + # + # Usually used like: + # + # dataset.where(1=>array_op.any) + # # WHERE (1 = ANY(array)) + def any + function(:ANY) + end + + # Call the cardinality method: + # + # array_op.cardinality # cardinality(array) + def cardinality + function(:cardinality) + end + + # Use the contains (@>) operator: + # + # array_op.contains(:a) # (array @> a) + def contains(other) + bool_op(CONTAINS, wrap_array(other)) + end + + # Use the contained by (<@) operator: + # + # array_op.contained_by(:a) # (array <@ a) + def contained_by(other) + bool_op(CONTAINED_BY, wrap_array(other)) + end + + # Call the array_dims method: + # + # array_op.dims # array_dims(array) + def dims + function(:array_dims) + end + + # Convert the array into an hstore using the hstore function. + # If given an argument, use the two array form: + # + # array_op.hstore # hstore(array) + # array_op.hstore(:array2) # hstore(array, array2) + def hstore(arg=(no_arg_given=true; nil)) + v = if no_arg_given + Sequel.function(:hstore, self) + else + Sequel.function(:hstore, self, wrap_array(arg)) + end + # :nocov: + if Sequel.respond_to?(:hstore_op) + # :nocov: + v = Sequel.hstore_op(v) + end + v + end + + # Call the array_length method: + # + # array_op.length # array_length(array, 1) + # array_op.length(2) # array_length(array, 2) + def length(dimension = 1) + function(:array_length, dimension) + end + + # Call the array_lower method: + # + # array_op.lower # array_lower(array, 1) + # array_op.lower(2) # array_lower(array, 2) + def lower(dimension = 1) + function(:array_lower, dimension) + end + + # Use the overlaps (&&) operator: + # + # array_op.overlaps(:a) # (array && a) + def overlaps(other) + bool_op(OVERLAPS, wrap_array(other)) + end + + # Use the concatentation (||) operator: + # + # array_op.push(:a) # (array || a) + # array_op.concat(:a) # (array || a) + def push(other) + array_op(CONCAT, [self, wrap_array(other)]) + end + alias concat push + + # Return the receiver. + def pg_array + self + end + + # Remove the given element from the array: + # + # array_op.remove(1) # array_remove(array, 1) + def remove(element) + ArrayOp.new(function(:array_remove, element)) + end + + # Replace the given element in the array with another + # element: + # + # array_op.replace(1, 2) # array_replace(array, 1, 2) + def replace(element, replacement) + ArrayOp.new(function(:array_replace, element, replacement)) + end + + # Call the array_to_string method: + # + # array_op.join # array_to_string(array, '') + # array_op.to_string # array_to_string(array, '') + # array_op.join(":") # array_to_string(array, ':') + # array_op.join(":", "*") # array_to_string(array, ':', '*') + def to_string(joiner="", null=nil) + if null.nil? + function(:array_to_string, joiner) + else + function(:array_to_string, joiner, null) + end + end + alias join to_string + + def rs_to_string(joiner="", null=nil) + query = <<~SQL.squish + ( + SELECT + listagg(json_serialize(el), ?) + FROM + ? AS el + ) + SQL + + Sequel.lit(query, joiner, pg_array) + end + alias rs_join rs_to_string + + # Call the unnest method: + # + # array_op.unnest # unnest(array) + def unnest(*args) + function(:unnest, *args.map{|a| wrap_array(a)}) + end + + # Use the concatentation (||) operator, reversing the order: + # + # array_op.unshift(:a) # (a || array) + def unshift(other) + array_op(CONCAT, [wrap_array(other), self]) + end + + private + + # Return a placeholder literal with the given str and args, wrapped + # in an ArrayOp, used by operators that return arrays. + def array_op(str, args) + ArrayOp.new(Sequel::SQL::PlaceholderLiteralString.new(str, args)) + end + + # Return a placeholder literal with the given str and args, wrapped + # in a boolean expression, used by operators that return booleans. + def bool_op(str, other) + Sequel::SQL::BooleanExpression.new(:NOOP, Sequel::SQL::PlaceholderLiteralString.new(str, [value, other])) + end + + # Return a function with the given name, and the receiver as the first + # argument, with any additional arguments given. + def function(name, *args) + SQL::Function.new(name, self, *args) + end + + # Automatically wrap argument in a PGArray if it is a plain Array. + # Requires that the pg_array extension has been loaded to work. + def wrap_array(arg) + if arg.instance_of?(Array) + Sequel.pg_array(arg) + else + arg + end + end + end + + module ArrayOpMethods + # Wrap the receiver in an ArrayOp so you can easily use the PostgreSQL + # array functions and operators with it. + def pg_array + ArrayOp.new(self) + end + end + + # :nocov: + if defined?(PGArray) + # :nocov: + class PGArray + # Wrap the PGArray instance in an ArrayOp, allowing you to easily use + # the PostgreSQL array functions and operators with literal arrays. + def op + ArrayOp.new(self) + end + end + end + end + + module SQL::Builders + # Return the object wrapped in an Postgres::ArrayOp. + def pg_array_op(v) + case v + when Postgres::ArrayOp + v + else + Postgres::ArrayOp.new(v) + end + end + end + + class SQL::GenericExpression + include Sequel::Postgres::ArrayOpMethods + end + + class LiteralString + include Sequel::Postgres::ArrayOpMethods + end +end + +# :nocov: +if Sequel.core_extensions? + class Symbol + include Sequel::Postgres::ArrayOpMethods + end +end + +if defined?(Sequel::CoreRefinements) + module Sequel::CoreRefinements + refine Symbol do + send INCLUDE_METH, Sequel::Postgres::ArrayOpMethods + end + end +end +# :nocov: \ No newline at end of file diff --git a/lib/sequel/extensions/redshift_string_agg.rb b/lib/sequel/extensions/redshift_string_agg.rb new file mode 100644 index 0000000..c6ee4bc --- /dev/null +++ b/lib/sequel/extensions/redshift_string_agg.rb @@ -0,0 +1,98 @@ +# frozen-string-literal: true +# +# Please consider using patched `string_agg` extension, it properly handles all supported DB adapters +# including Redshift out of the box. +# +# The redshift_string_agg extension adds the ability to perform database-independent +# aggregate string concatentation on Amazon Redshift. + +# Related module: Sequel::SQL::RedshiftStringAgg +module Sequel + module SQL + module Builders + # Return a RedshiftStringAgg expression for an aggregate string concatentation. + def redshift_string_agg(*a) + RedshiftStringAgg.new(*a) + end + end + + # The RedshiftStringAgg class represents an aggregate string concatentation. + class RedshiftStringAgg < GenericExpression + include StringMethods + include StringConcatenationMethods + include InequalityMethods + include AliasMethods + include CastMethods + include OrderMethods + include PatternMatchMethods + include SubscriptMethods + + # These methods are added to datasets using the redshift_string_agg + # extension, for the purposes of correctly literalizing RedshiftStringAgg + # expressions for the appropriate database type. + module DatasetMethods + # Append the SQL fragment for the RedshiftStringAgg expression to the SQL query. + def redshift_string_agg_sql_append(sql, sa) + unless db.adapter_scheme == :redshift + raise Error, "redshift_string_agg is not implemented on #{db.adapter_scheme}" + end + + expr = sa.expr + separator = sa.separator || "," + order = sa.order_expr + distinct = sa.is_distinct? + + if distinct + raise Error, "redshift_string_agg with distinct is not implemented on #{db.database_type}" + end + literal_append(sql, Function.new(:listagg, expr, separator)) + if order + sql << " WITHIN GROUP (ORDER BY " + expression_list_append(sql, order) + sql << ")" + else + sql << " WITHIN GROUP (ORDER BY 1)" + end + end + end + + # The string expression for each row that will concatenated to the output. + attr_reader :expr + + # The separator between each string expression. + attr_reader :separator + + # The expression that the aggregation is ordered by. + attr_reader :order_expr + + # Set the expression and separator + def initialize(expr, separator=nil) + @expr = expr + @separator = separator + end + + # Whether the current expression uses distinct expressions + def is_distinct? + @distinct == true + end + + # Return a modified RedshiftStringAgg that uses distinct expressions + def distinct + sa = dup + sa.instance_variable_set(:@distinct, true) + sa + end + + # Return a modified RedshiftStringAgg with the given order + def order(*o) + sa = dup + sa.instance_variable_set(:@order_expr, o.empty? ? nil : o) + sa + end + + to_s_method :redshift_string_agg_sql + end + end + + Dataset.register_extension(:redshift_string_agg, SQL::RedshiftStringAgg::DatasetMethods) +end diff --git a/lib/sequel/extensions/string_agg.rb b/lib/sequel/extensions/string_agg.rb new file mode 100644 index 0000000..60d671e --- /dev/null +++ b/lib/sequel/extensions/string_agg.rb @@ -0,0 +1,182 @@ +# frozen-string-literal: true +# +# This is patched version of original plugin +# @see https://github.com/jeremyevans/sequel/blob/4.49.0/lib/sequel/extensions/string_agg.rb +# The reasons for this patch are 4.49 is not updated anymore and Redshift is not officially +# supported by sequel. +# +# The string_agg extension adds the ability to perform database-independent +# aggregate string concatentation. For example, with a table like: +# +# c1 | c2 +# ---+--- +# a | 1 +# a | 2 +# a | 3 +# b | 4 +# +# You can return a result set like: +# +# c1 | c2s +# ---+--- +# a | 1,2,3 +# b | 4 +# +# First, you need to load the extension into the database: +# +# DB.extension :string_agg +# +# Then you can use the Sequel.string_agg method to return a Sequel +# expression: +# +# sa = Sequel.string_agg(:column_name) +# # or: +# sa = Sequel.string_agg(:column_name, '-') # custom separator +# +# You can specify the order in which the concatention happens by +# calling +order+ on the expression: +# +# sa = Sequel.string_agg(:column_name).order(:other_column) +# +# Additionally, if you want to have the concatenation only operate +# on distinct values, you can call distinct: +# +# sa = Sequel.string_agg(:column_name).order(:other_column).distinct +# +# These expressions can be used in your datasets, or anywhere else that +# Sequel expressions are allowed: +# +# DB[:table]. +# select_group(:c1). +# select_append(Sequel.string_agg(:c2)) +# +# This extension currenly supports the following databases: +# +# * PostgreSQL 9+ +# * SQLAnywhere 12+ +# * Oracle 11g+ (except distinct) +# * DB2 9.7+ (except distinct) +# * MySQL +# * HSQLDB +# * H2 +# +# Related module: Sequel::SQL::StringAgg + +# +module Sequel + module SQL + module Builders + # Return a StringAgg expression for an aggregate string concatentation. + def string_agg(*a) + StringAgg.new(*a) + end + end + + # The StringAgg class represents an aggregate string concatentation. + class StringAgg < GenericExpression + include StringMethods + include StringConcatenationMethods + include InequalityMethods + include AliasMethods + include CastMethods + include OrderMethods + include PatternMatchMethods + include SubscriptMethods + + # These methods are added to datasets using the string_agg + # extension, for the purposes of correctly literalizing StringAgg + # expressions for the appropriate database type. + module DatasetMethods + # Append the SQL fragment for the StringAgg expression to the SQL query. + def string_agg_sql_append(sql, sa) + if defined?(super) + return super + end + + expr = sa.expr + separator = sa.separator || "," + order = sa.order_expr + distinct = sa.is_distinct? + + case db_type = db.adapter_scheme + when :postgres, :sqlanywhere + f = Function.new(db_type == :postgres ? :string_agg : :list, expr, separator) + if order + f = f.order(*order) + end + if distinct + f = f.distinct + end + literal_append(sql, f) + # SEQUEL5: Remove cubrid + when :mysql, :hsqldb, :cubrid, :h2 + sql << "GROUP_CONCAT(" + if distinct + sql << "DISTINCT " + end + literal_append(sql, expr) + if order + sql << " ORDER BY " + expression_list_append(sql, order) + end + sql << " SEPARATOR " + literal_append(sql, separator) + sql << ")" + when :oracle, :db2, :redshift + if distinct + raise Error, "string_agg with distinct is not implemented on #{db.database_type}" + end + literal_append(sql, Function.new(:listagg, expr, separator)) + if order + sql << " WITHIN GROUP (ORDER BY " + expression_list_append(sql, order) + sql << ")" + else + sql << " WITHIN GROUP (ORDER BY 1)" + end + else + raise Error, "string_agg is not implemented on #{db.database_type}" + end + end + end + + # The string expression for each row that will concatenated to the output. + attr_reader :expr + + # The separator between each string expression. + attr_reader :separator + + # The expression that the aggregation is ordered by. + attr_reader :order_expr + + # Set the expression and separator + def initialize(expr, separator=nil) + @expr = expr + @separator = separator + end + + # Whether the current expression uses distinct expressions + def is_distinct? + @distinct == true + end + + # Return a modified StringAgg that uses distinct expressions + def distinct + sa = dup + sa.instance_variable_set(:@distinct, true) + sa + end + + # Return a modified StringAgg with the given order + def order(*o) + sa = dup + sa.instance_variable_set(:@order_expr, o.empty? ? nil : o) + sa + end + + to_s_method :string_agg_sql + end + end + + Dataset.register_extension(:string_agg, SQL::StringAgg::DatasetMethods) +end diff --git a/lib/sequel/helpers.rb b/lib/sequel/helpers.rb new file mode 100644 index 0000000..c4d68ba --- /dev/null +++ b/lib/sequel/helpers.rb @@ -0,0 +1,7 @@ +module Sequel + class Database + def redshift? + adapter_scheme == :redshift + end + end +end diff --git a/lib/sequel/redshift.rb b/lib/sequel/redshift.rb index a7aa1d8..9869bcb 100644 --- a/lib/sequel/redshift.rb +++ b/lib/sequel/redshift.rb @@ -1 +1,2 @@ require "sequel/redshift/version" +require "sequel/helpers" diff --git a/lib/sequel/redshift/version.rb b/lib/sequel/redshift/version.rb index 74b4aab..7b5c033 100644 --- a/lib/sequel/redshift/version.rb +++ b/lib/sequel/redshift/version.rb @@ -1,5 +1,5 @@ module Sequel module Redshift - VERSION = "0.0.1" + VERSION = "0.0.6" end end diff --git a/sequel-redshift.gemspec b/sequel-redshift.gemspec index 1cc251c..49b5d2e 100644 --- a/sequel-redshift.gemspec +++ b/sequel-redshift.gemspec @@ -19,9 +19,9 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_dependency "pg" - spec.add_dependency "sequel" + spec.add_dependency "sequel", '~> 5' - spec.add_development_dependency "bundler", "~> 1.3" + spec.add_development_dependency "bundler", "~> 2" spec.add_development_dependency "rake" spec.add_development_dependency "rspec" end diff --git a/spec/database.yml.example b/spec/database.yml.example new file mode 100644 index 0000000..db31a2f --- /dev/null +++ b/spec/database.yml.example @@ -0,0 +1,6 @@ +adapter: redshift +database: test1 +username: +password: +host: r101-dw-production.ck1sr65ao7pg.us-east-1.redshift.amazonaws.com +port: 5439 diff --git a/spec/features/extensions_spec.rb b/spec/features/extensions_spec.rb new file mode 100644 index 0000000..49f74a6 --- /dev/null +++ b/spec/features/extensions_spec.rb @@ -0,0 +1,63 @@ +require 'spec_helper' + +describe 'Extensions' do + describe 'string_agg' do + before { DB.extension :string_agg } + + it 'generates correct sql for Redshift' do + allow(DB).to receive(:adapter_scheme).and_return(:redshift) + expect( + DB[:test]. + select( + Sequel. + string_agg(Sequel[:revenue].cast_string). + order(Sequel.asc(:period_start), Sequel.asc(:sub_period_start)). + as(:relative) + ).sql).to eq( + "SELECT listagg(CAST(\"revenue\" AS varchar(255)), ',') WITHIN GROUP (ORDER BY \"period_start\" ASC, \"sub_period_start\" ASC) AS \"relative\" FROM \"test\"" + ) + end + + it 'generates correct sql for Postgresql' do + allow(DB).to receive(:adapter_scheme).and_return(:postgres) + expect( + DB[:test]. + select( + Sequel. + string_agg(Sequel[:revenue].cast_string). + order(Sequel.asc(:period_start), Sequel.asc(:sub_period_start)). + as(:relative) + ).sql).to eq( + "SELECT string_agg(CAST(\"revenue\" AS varchar(255)), ',' ORDER BY \"period_start\" ASC, \"sub_period_start\" ASC) AS \"relative\" FROM \"test\"" + ) + end + end + + describe 'median' do + before { DB.extension :median } + + it 'generates correct sql for Redshift' do + allow(DB).to receive(:adapter_scheme).and_return(:redshift) + expect( + DB[:test]. + select( + Sequel. + median(:revenue) + ).sql).to eq( + "SELECT median(\"revenue\") FROM \"test\"" + ) + end + + it 'generates correct sql for Postgresql' do + allow(DB).to receive(:adapter_scheme).and_return(:postgres) + expect( + DB[:test]. + select( + Sequel. + median(:revenue) + ).sql).to eq( + "SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY \"revenue\") FROM \"test\"" + ) + end + end +end diff --git a/spec/features/redshift_spec.rb b/spec/features/redshift_spec.rb index f0d4376..69d7c64 100644 --- a/spec/features/redshift_spec.rb +++ b/spec/features/redshift_spec.rb @@ -9,6 +9,7 @@ end it "inserts new record" do + DB.drop_table? :items DB.create_table :items do primary_key :id column :name, 'varchar(255)' diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index b10f42b..56c8cb2 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -5,10 +5,26 @@ require 'sequel' require 'logger' +require 'yaml' + +$: << "." +require 'sequel/redshift' + +config_path = File.join(File.dirname(__FILE__), "database.yml") + +unless File.exists?(config_path) + warn "spec/database.yml does not exist." + warn "Create it based on spec/database.yml.example\nto conenct to a redshift cluster." + exit 1 +end + +options = YAML.load(File.read(config_path)) +options.merge(logger: Logger.new(STDOUT)) + +DB = Sequel.connect(options) +if ENV['DEBUG_SQL'] == 'true' + DB.loggers << Logger.new($stdout) +end +# Run all the tests in a specific test schema +DB.run "set search_path to 'sequel_redshift_adapter_test'" -options = { - client_min_messages: false, - force_standard_strings: false, - logger: Logger.new(STDOUT) -} -DB = Sequel.connect('redshift://remind101:wvpfSzw2TT8COE@r101-dw-production.ck1sr65ao7pg.us-east-1.redshift.amazonaws.com:5439/test1', options)