diff --git a/lib/spark/build.rb b/lib/spark/build.rb index e1c3fed..3e2c616 100644 --- a/lib/spark/build.rb +++ b/lib/spark/build.rb @@ -3,7 +3,7 @@ module Build DEFAULT_SCALA_VERSION = '2.10.4' DEFAULT_CORE_VERSION = '2.10' - DEFAULT_SPARK_VERSION = '1.5.1' + DEFAULT_SPARK_VERSION = '1.5.2' DEFAULT_HADOOP_VERSION = '1.0.4' SBT = 'sbt/sbt' diff --git a/lib/spark/sql.rb b/lib/spark/sql.rb index b2611fb..7434d04 100644 --- a/lib/spark/sql.rb +++ b/lib/spark/sql.rb @@ -6,6 +6,7 @@ module SQL autoload_without_import :DataType, 'spark/sql/data_type' autoload_without_import :DataFrame, 'spark/sql/data_frame' autoload_without_import :DataFrameReader, 'spark/sql/data_frame_reader' + autoload_without_import :DataFrameWriter, 'spark/sql/data_frame_writer' autoload :Row, 'spark/sql/row' autoload :Column, 'spark/sql/column' diff --git a/lib/spark/sql/context.rb b/lib/spark/sql/context.rb index d0da2af..3b55274 100644 --- a/lib/spark/sql/context.rb +++ b/lib/spark/sql/context.rb @@ -13,6 +13,10 @@ def read DataFrameReader.new(self) end + def sql query + DataFrame.new(jsql_context.sql(query).toDF, self) + end + end end end diff --git a/lib/spark/sql/data_frame.rb b/lib/spark/sql/data_frame.rb index 13adb09..ad702e5 100644 --- a/lib/spark/sql/data_frame.rb +++ b/lib/spark/sql/data_frame.rb @@ -250,10 +250,17 @@ def filter(condition) # Limits the result count to the number specified. def limit(num) - new_jdf = jdf.limit(mum) + new_jdf = jdf.limit(num) DataFrame.new(new_jdf, sql_context) end + def register_temp_table name + jdf.registerTempTable(name) + end + + def write + DataFrameWriter.new(self) + end alias_method :where, :filter diff --git a/lib/spark/sql/data_frame_reader.rb b/lib/spark/sql/data_frame_reader.rb index 9e6e72a..42c0051 100644 --- a/lib/spark/sql/data_frame_reader.rb +++ b/lib/spark/sql/data_frame_reader.rb @@ -93,6 +93,18 @@ def json(path, new_schema=nil) load(path, 'org.apache.spark.sql.execution.datasources.json', new_schema) end + def parquet(path, new_schema=nil) + load(path, 'org.apache.spark.sql.execution.datasources.parquet', new_schema) + end + + def orc(path, new_schema=nil) + load(path, 'org.apache.spark.sql.execution.datasources.orc', new_schema) + end + + def table name + df(jreader.table(name)) + end + end end end diff --git a/lib/spark/sql/data_frame_writer.rb b/lib/spark/sql/data_frame_writer.rb new file mode 100644 index 0000000..4c99e3b --- /dev/null +++ b/lib/spark/sql/data_frame_writer.rb @@ -0,0 +1,101 @@ +module Spark + module SQL + class DataFrameWriter + + attr_reader :sql_context, :jwriter + + def initialize(df) + @sql_context = df.sql_context + @jwriter = df.jdf.write + end + + # Specifies the input data source format. + # Parameter is name of the data source, e.g. 'json', 'parquet'. + def format(source) + jwriter.format(source) + self + end + + # Adds an input option for the underlying data source. + def option(key, value) + jwriter.option(key, value.to_s) + self + end + + # Adds input options for the underlying data source. + def options(options) + options.each do |key, value| + jwriter.option(key, value.to_s) + end + self + end + + # Loads data from a data source and returns it as a :class`DataFrame`. + # + # == Parameters: + # path:: Optional string for file-system backed data sources. + # format:: Optional string for format of the data source. Default to 'parquet'. + # schema:: Optional {StructType} for the input schema. + # options:: All other string options. + # + def save(path=nil, new_format=nil, new_options=nil) + new_format && format(new_format) + new_options && options(new_options) + + # TODO - jwrite returns nil, probably should catch exception and return true/false + if path.nil? + jwriter.save + else + jwriter.save(path) + end + end + + # Saves DataFrame as a JSON file (one object per line) + # + # == Parameters: + # path:: string, path to the JSON dataset + # + # == Example: + # df = sql.writer.json('output.json') + # + def json(path) + # ClassNotFoundException: Failed to load class for data source: json + # df(jwriter.json(path)) + + save(path, 'org.apache.spark.sql.execution.datasources.json') + end + + def parquet(path) + # ClassNotFoundException: Failed to load class for data source: parquet + # df(jwriter.parquet(path)) + + save(path, 'org.apache.spark.sql.execution.datasources.parquet') + end + + def orc(path) + # ClassNotFoundException: Failed to load class for data source: json + # df(jwriter.json(path)) + + save(path, 'org.apache.spark.sql.execution.datasources.orc') + end + + def insert_into table_name + jwriter.insertInto(table_name) + end + + def save_as_table name + jwriter.saveAsTable(name) + end + + def partition_by columns + jwriter.partitionBy(columns) + end + + def mode name + jwriter.mode(name) + self + end + + end + end +end