Featured

First blog post

This is the post excerpt.

Advertisements

This is your very first post. Click the Edit link to modify or delete it, or start a new post. If you like, use this post to tell readers why you started this blog and what you plan to do with it.

post

Import Data from RDBMS/Oracle into Hive using Spark/Scala

In this blog, I would explain how to import data from RDBMS into hive using Spark/Scala.

package com
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkContext
import org.apache.spark._
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.BZip2Codec
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.spark.sql.hive
import org.apache.spark.io.SnappyCompressionCodec
import java.util.Properties
import java.util.Properties
import java.sql.DriverManager
import java.sql.Connection
object importExadata {
def main(args: Array[String]){
val sc=new SparkContext(args(0),”importExadata”)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql(“SET hive.exec.dynamic.partition = true”)
hiveContext.sql(“SET hive.exec.dynamic.partition.mode = nonstrict “)
hiveContext.sql(“SET hive.exec.max.dynamic.partitions.pernode = 400”)
hiveContext.sql(“set hive.execution.engine=spark”)
val url=”jdbc:oracle:thin:@<hostname>:<port>:<sid>”
val prop = new java.util.Properties
prop.setProperty(“user”,”<user_name>”)
prop.setProperty(“password”,”<password>”)
prop.setProperty(“driver”,”oracle.jdbc.OracleDriver”)

//Spark 1.6
val df1 = hiveContext.read.format(“jdbc”).option(“url”, “jdbc:oracle:<hostname>:<port>:<sid>”).option(“dbtable”, “<dbname>.tablename>”).option(“user”, “<user_name>”).option(“password”, “<password>”).option(“dbtable”, s”(select cast(col_name> as string) from <dbname>.<tablename>)”).option(“lowerBound”,”1″).option(“upperBound”,”10000″).option(“numPartitions”,”1″).option(“fetchSize”, “1000”).load()

//Spark 2.0 (dbname and tablename in rdbms database)

val df2=hiveContext.read.jdbc(url, “<dbname>.<table_name>”, prop)

(dbname and tablename in hive database)
df2.write.option(“mode”,”append”).format(“orc”).insertInto(“<dbname>.<table_name>”)
}
}

Convert Sequence File to Parquet using Spark/Scala

In this blog, I will detail the code for converting sequence file to parquet using spark/scala. Suppose your existing hive table is in sequential format and partitioned by year and month. Read the database name,table name, partition dates, output path from the file. These are separted by ~ in the input file. Once the data is converted to parquet format, create an external table having similar structure as that of sequential table but in parquet format and pointing to the output path. The below code is 10 times faster than Spark SQL.

package com

import java.util.Calendar
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkContext
import org.apache.spark._
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.BZip2Codec
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.spark.sql.hive
import org.apache.spark.io.SnappyCompressionCodec
import scala.io.Source
object sequencetoorc {
def main(args: Array[String])
{
val sc=new SparkContext(args(0),”SeqtoOrc”)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val lines = Source.fromFile(args(1)).getLines.toList
for(tables <- lines)
{
var dbname=tables.mkString.split(“~”)(0)
var tab=tables.mkString.split(“~”)(1)
var newyear=tables.mkString.split(“~”)(2)
var year=tables.mkString.split(“~”)(3)
var outputpath=tables.mkString.split(“~”)(4)
val rdd1=hiveContext.sql(s”select * from $dbname.$tab where year between ‘$newyear’ and ‘$year'”)
rdd1.registerTempTable(“orctable”)
val results=hiveContext.sql(“select * from orctable”)
results.write.partitionBy(“year”,”month”).format(“parquet”).option(“compression”, “snappy”).mode(“append”).save(s”$outputpath”)
}
}
}

Convert ORC to Sequence File using Spark/Scala

In my previous blog, https://wordpress.com/post/aikanshbigdatablogs.wordpress.com/450 I explained how to convert Sequence File to ORC. In this, I will detail the code on how to do the reverse (convert orc to sequential format). In below code, I read the orc file and convert it into sequential format with fields separated by “|”. After successful conversion, create a hive table in sequential format with¬†fields separated by “|” pointing to output path.

package com

import java.util.Calendar
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkContext
import org.apache.spark._
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.BZip2Codec
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
import scala.io.Source
import java.util.Calendar
import java.text.SimpleDateFormat
import java.util.Date
object orctoseq1{
def main(args: Array[String])
{
val sc=new SparkContext(args(0),”orctoseq”)

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

val exists = fs.exists(new org.apache.hadoop.fs.Path(s”$inputpath/”))
if (exists){
val orcfile = s(“$inputpath/*”,classOf[OrcInputFormat], classOf[OrcOutputFormat])
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.format(“orc”).load(orcfile)
val rddout = df.rdd.repartition(10).map(f => (new BytesWritable(), new Text(f.toSeq.mkString(“|”)))).saveAsSequenceFile(s”$inputpath”+s”_temp/”,
Some(classOf[BZip2Codec]))
val tempexists = fs.exists(new org.apache.hadoop.fs.Path(s”$inputpath”+s”_temp/_SUCCESS”))
if (tempexists){
fs.delete(new org.apache.hadoop.fs.Path(s”$inputpath/”),true)
fs.rename(new org.apache.hadoop.fs.Path(s”$inputpath”+s”_temp/”), new org.apache.hadoop.fs.Path(s”$inputpath/”))
fs.delete(new org.apache.hadoop.fs.Path(s”$inputpath/_SUCCESS”))
}
}
}
}

Export data to Oracle Exadata (RDBMS) from Hive using Spark/Scala

Suppose, you  have employee table in hive having name,age and gender columns. You need to export it to Oracle Exadata using Spark Scala.

Create the similar structure table in exadata (eg. employee having same column names and corresponding datatypes in Exadata) and follow the below code.

 

package com
import java.util.Calendar
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkContext
import org.apache.spark._
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.BZip2Codec
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.spark.sql.hive
import org.apache.spark.io.SnappyCompressionCodec
import java.util.Properties
import java.util.Properties
import java.sql.DriverManager
import java.sql.Connection
object exportToExadata {
def main(args: Array[String])
{

//provide ip of host, port number and its sid in url below

val url=”jdbc:oracle:thin:@ip_of_host:port:sid”
val prop = new java.util.Properties

//provide username and password
prop.setProperty(“user”,”username”)
prop.setProperty(“password”,”password”)
prop.setProperty(“driver”,”oracle.jdbc.OracleDriver”)
val sc=new SparkContext(args(0),”exadata”)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql(“SET hive.exec.dynamic.partition = true”)
hiveContext.sql(“SET hive.exec.dynamic.partition.mode = nonstrict “)
hiveContext.sql(“SET hive.exec.max.dynamic.partitions.pernode = 400”)
hiveContext.sql(“set hive.execution.engine=spark”)

val rdd1=hiveContext.sql(s”select name,age,gender from employee”)
if(rdd1.count()>1)
{
rdd1.write.mode(“append”).jdbc(url,”employee”,prop)
}
}
sc.stop()
}

Convert Sequence File to ORC using Spark/Scala

In this blog, I will detail the code for converting sequence file to orc using spark/scala. Suppose your existing hive table is in sequential format and partitioned by year and month. Read the database name,table name, partition dates, output path from the file. These are separted by ~ in the input file. Once the data is converted to ORC format, create an external table having similar structure as that of sequential table but in ORC format and pointing to the output path. The below code is 10 times faster than Spark SQL. I

package com

import java.util.Calendar
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkContext
import org.apache.spark._
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.BZip2Codec
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.spark.sql.hive
import org.apache.spark.io.SnappyCompressionCodec
import scala.io.Source
object sequencetoorc {
def main(args: Array[String])
{
val sc=new SparkContext(args(0),”SeqtoOrc”)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val lines = Source.fromFile(args(1)).getLines.toList
for(tables <- lines)
{
var dbname=tables.mkString.split(“~”)(0)
var tab=tables.mkString.split(“~”)(1)
var newyear=tables.mkString.split(“~”)(2)
var year=tables.mkString.split(“~”)(3)
var outputpath=tables.mkString.split(“~”)(4)
val rdd1=hiveContext.sql(s”select * from $dbname.$tab where year between ‘$newyear’ and ‘$year'”)
rdd1.registerTempTable(“orctable”)
val results=hiveContext.sql(“select * from orctable”)
results.write.partitionBy(“year”,”month”).format(“orc”).option(“compression”, “snappy”).mode(“append”).save(s”$outputpath”)
}
}
}

Merge Sequence File using Spark-Scala

package com
import java.util.Calendar
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkContext
import org.apache.spark._
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.BZip2Codec
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import scala.io.Source
import java.util.Calendar
import java.text.SimpleDateFormat
import java.util.Date
object mergemwch {
def main(args: Array[String])
{
val sc=new SparkContext(args(0),”merge”)

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

//put your hdfs input path
var inputpath=”hdfs://<path_goes_here>”

val exists = fs.exists(new org.apache.hadoop.fs.Path(s”$inputpath”))
if (exists){
val rddinput = sc.sequenceFile(s”$inputpath/*”,classOf[BytesWritable], classOf[Text],15)
val summary = fs.getContentSummary(new org.apache.hadoop.fs.Path(s”$inputpath”))
val length = summary.getLength
val numFiles = length/(1024*1024*100) //making every file around 100 MB
val rddout = rddinput.repartition(numFiles.toInt).saveAsSequenceFile(s”$inputpath”+s”_temp/”,
Some(classOf[BZip2Codec]))
val tempexists = fs.exists(new org.apache.hadoop.fs.Path(s”$inputpath”+s”_temp/_SUCCESS”))
if (tempexists){
fs.delete(new org.apache.hadoop.fs.Path(s”$inputpath/”),true)
fs.rename(new org.apache.hadoop.fs.Path(s”$inputpath”+s”_temp/”), new org.apache.hadoop.fs.Path(s”$inputpath”))
fs.delete(new org.apache.hadoop.fs.Path(s”$inputpath/_SUCCESS”))
}
}
}
}

Read and Write Parquet file using Spark/Scala

package com

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Parquet extends App{
System.setProperty(“hadoop.home.dir”, “c://winutil//”)
val conf=new SparkConf().setMaster(“local[2]”).setAppName(“auction”)
val sc = new SparkContext(conf)
val data = Array(1, 2, 3, 4, 5) // create Array of Integers
val dataRDD = sc.parallelize(data) // create an RDD

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._;
val dataDF = dataRDD.toDF() // convert RDD to DataFrame
dataDF.write.parquet(“data.parquet”) // write to parquet

val newDataDF = sqlContext.read.parquet(“data.parquet”) // read back parquet to DF // show contents
println(newDataDF)
}