From 97f872263eaa5bedf0c046f07e82b9ed9fe4d2ec Mon Sep 17 00:00:00 2001 From: Tom Zeng Date: Sun, 22 Nov 2015 14:23:50 -0800 Subject: [PATCH 1/3] Correct typo in #limit --- lib/spark/sql/data_frame.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/spark/sql/data_frame.rb b/lib/spark/sql/data_frame.rb index 13adb09..1a4fdeb 100644 --- a/lib/spark/sql/data_frame.rb +++ b/lib/spark/sql/data_frame.rb @@ -250,7 +250,7 @@ def filter(condition) # Limits the result count to the number specified. def limit(num) - new_jdf = jdf.limit(mum) + new_jdf = jdf.limit(num) DataFrame.new(new_jdf, sql_context) end From d5a8397d4de9167f384eb9d3038d6b2882421e23 Mon Sep 17 00:00:00 2001 From: Tom Zeng Date: Sun, 22 Nov 2015 23:42:22 -0800 Subject: [PATCH 2/3] Add DataFrameWriter related stuff for dataframe write support --- lib/spark/build.rb | 2 +- lib/spark/sql.rb | 1 + lib/spark/sql/context.rb | 4 ++ lib/spark/sql/data_frame.rb | 7 +++ lib/spark/sql/data_frame_reader.rb | 12 ++++ lib/spark/sql/data_frame_writer.rb | 96 ++++++++++++++++++++++++++++++ 6 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 lib/spark/sql/data_frame_writer.rb diff --git a/lib/spark/build.rb b/lib/spark/build.rb index e1c3fed..3e2c616 100644 --- a/lib/spark/build.rb +++ b/lib/spark/build.rb @@ -3,7 +3,7 @@ module Build DEFAULT_SCALA_VERSION = '2.10.4' DEFAULT_CORE_VERSION = '2.10' - DEFAULT_SPARK_VERSION = '1.5.1' + DEFAULT_SPARK_VERSION = '1.5.2' DEFAULT_HADOOP_VERSION = '1.0.4' SBT = 'sbt/sbt' diff --git a/lib/spark/sql.rb b/lib/spark/sql.rb index b2611fb..7434d04 100644 --- a/lib/spark/sql.rb +++ b/lib/spark/sql.rb @@ -6,6 +6,7 @@ module SQL autoload_without_import :DataType, 'spark/sql/data_type' autoload_without_import :DataFrame, 'spark/sql/data_frame' autoload_without_import :DataFrameReader, 'spark/sql/data_frame_reader' + autoload_without_import :DataFrameWriter, 'spark/sql/data_frame_writer' autoload :Row, 'spark/sql/row' autoload :Column, 'spark/sql/column' diff --git a/lib/spark/sql/context.rb b/lib/spark/sql/context.rb index d0da2af..3b55274 100644 --- a/lib/spark/sql/context.rb +++ b/lib/spark/sql/context.rb @@ -13,6 +13,10 @@ def read DataFrameReader.new(self) end + def sql query + DataFrame.new(jsql_context.sql(query).toDF, self) + end + end end end diff --git a/lib/spark/sql/data_frame.rb b/lib/spark/sql/data_frame.rb index 1a4fdeb..ad702e5 100644 --- a/lib/spark/sql/data_frame.rb +++ b/lib/spark/sql/data_frame.rb @@ -254,6 +254,13 @@ def limit(num) DataFrame.new(new_jdf, sql_context) end + def register_temp_table name + jdf.registerTempTable(name) + end + + def write + DataFrameWriter.new(self) + end alias_method :where, :filter diff --git a/lib/spark/sql/data_frame_reader.rb b/lib/spark/sql/data_frame_reader.rb index 9e6e72a..42c0051 100644 --- a/lib/spark/sql/data_frame_reader.rb +++ b/lib/spark/sql/data_frame_reader.rb @@ -93,6 +93,18 @@ def json(path, new_schema=nil) load(path, 'org.apache.spark.sql.execution.datasources.json', new_schema) end + def parquet(path, new_schema=nil) + load(path, 'org.apache.spark.sql.execution.datasources.parquet', new_schema) + end + + def orc(path, new_schema=nil) + load(path, 'org.apache.spark.sql.execution.datasources.orc', new_schema) + end + + def table name + df(jreader.table(name)) + end + end end end diff --git a/lib/spark/sql/data_frame_writer.rb b/lib/spark/sql/data_frame_writer.rb new file mode 100644 index 0000000..ad752df --- /dev/null +++ b/lib/spark/sql/data_frame_writer.rb @@ -0,0 +1,96 @@ +module Spark + module SQL + class DataFrameWriter + + attr_reader :sql_context, :jwriter + + def initialize(df) + @sql_context = df.sql_context + @jwriter = df.jdf.write + end + + # Specifies the input data source format. + # Parameter is name of the data source, e.g. 'json', 'parquet'. + def format(source) + jwriter.format(source) + self + end + + # Adds an input option for the underlying data source. + def option(key, value) + jwriter.option(key, value.to_s) + self + end + + # Adds input options for the underlying data source. + def options(options) + options.each do |key, value| + jwriter.option(key, value.to_s) + end + self + end + + # Loads data from a data source and returns it as a :class`DataFrame`. + # + # == Parameters: + # path:: Optional string for file-system backed data sources. + # format:: Optional string for format of the data source. Default to 'parquet'. + # schema:: Optional {StructType} for the input schema. + # options:: All other string options. + # + def save(path=nil, new_format=nil, new_options=nil) + new_format && format(new_format) + new_options && options(new_options) + + # TODO - jwrite returns nil, probably should catch exception and return true/false + if path.nil? + jwriter.save + else + jwriter.save(path) + end + end + + # Saves DataFrame as a JSON file (one object per line) + # + # == Parameters: + # path:: string, path to the JSON dataset + # + # == Example: + # df = sql.writer.json('output.json') + # + def json(path) + # ClassNotFoundException: Failed to load class for data source: json + # df(jwriter.json(path)) + + save(path, 'org.apache.spark.sql.execution.datasources.json') + end + + def parquet(path) + # ClassNotFoundException: Failed to load class for data source: parquet + # df(jwriter.parquet(path)) + + save(path, 'org.apache.spark.sql.execution.datasources.parquet') + end + + def orc(path) + # ClassNotFoundException: Failed to load class for data source: json + # df(jwriter.json(path)) + + save(path, 'org.apache.spark.sql.execution.datasources.orc') + end + + def insert_into + end + + def save_as_table + end + + def partition_by + end + + def mode + end + + end + end +end From 1187e929334757c66498e72b72e02094ed29a194 Mon Sep 17 00:00:00 2001 From: Tom Zeng Date: Mon, 23 Nov 2015 12:10:05 -0800 Subject: [PATCH 3/3] Add #insert_into, #save_as_table, #partition_by, #mode to DataFrameWriter --- lib/spark/sql/data_frame_writer.rb | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/spark/sql/data_frame_writer.rb b/lib/spark/sql/data_frame_writer.rb index ad752df..4c99e3b 100644 --- a/lib/spark/sql/data_frame_writer.rb +++ b/lib/spark/sql/data_frame_writer.rb @@ -79,16 +79,21 @@ def orc(path) save(path, 'org.apache.spark.sql.execution.datasources.orc') end - def insert_into + def insert_into table_name + jwriter.insertInto(table_name) end - def save_as_table + def save_as_table name + jwriter.saveAsTable(name) end - def partition_by + def partition_by columns + jwriter.partitionBy(columns) end - def mode + def mode name + jwriter.mode(name) + self end end