Extraction and loading are important parts of BigData ETL operations. In this article, we will be focusing on data ingestion operations mainly with Sqoop and Flume. These operations are quite often used to transfer data between file systems e.g. HDFS, noSql databases e.g. Hbase, Sql databases e.g. Hive, message queuing system e.g. Kafka, as well as other sources and sinks.
Package | Version |
---|---|
hadoop | 2.7.3 |
hive | 2.1.0 |
mariadb | 10.1.21 |
sqoop | 1.4.6 |
flume | 1.8.0 |
Install Hadoop
ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
brew install hadoop
Configure Hadoop
Locate file /usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop/hadoop-env.sh
and change or update the following line
export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc="
Locate file /usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop/core-site.xml
and change or update the following line
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/Cellar/hadoop/hdfs/tmp</value>
<description>A base for other temporary directories.</description>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
Locate file /usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop/mapred-site.xml
and change or update the following line
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9010</value>
</property>
</configuration>
Locate file /usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop/hdfs-site.xml
and change or update the following line
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9010</value>
</property>
</configuration>
For convenient, add the following two line to ~/.profile
and load with source ~/.profile
export HADOOP_HOME=/usr/local/Cellar/hadoop/2.7.3
export HIVE_HOME=/usr/local/Cellar/hive/2.1.0/libexec
alias hstart="/usr/local/Cellar/hadoop/2.7.3/sbin/start-dfs.sh;/usr/local/Cellar/hadoop/2.7.3/sbin/start-yarn.sh"
alias hstop="/usr/local/Cellar/hadoop/2.7.3/sbin/stop-yarn.sh;/usr/local/Cellar/hadoop/2.7.3/sbin/stop-dfs.sh"
export PATH=/usr/local/Cellar/hive/2.1.0/bin/:$PATH
Check and fix SSH connection
Check SSh connection to localhost with following command
cat ~/.ssh/id_res.pub >> ~/.ssh/authorized_keys
ssh localhost
In case of port 22 not open for ssh, fix with the following command
sudo systemsetup -setremotelogin on
Format hdfs with hdfs namenode -format
and start Hadoop with hstart
Install MariaDB according to the followings
brew install mariadb
mysql.server start
brew install mysql
brew install hive
Download driver class, in this case, jdbd driver mysql-connector-java-5.1.41-bin.jar. Copy the jar file to Hive lib.
Set Hive metastore on MySql, first run shell command and get into MySql
mysqladmin - u root password 'pwd'
mysql - u root - p
then run the following SQL
CREATE DATABASE metastore;
USE metastore;
ALTER DATABASE metastore CHARACTER SET latin1 COLLATE latin1_swedish_ci;
CREATE USER 'hiveuser'@'localhost' IDENTIFIED BY 'hivepwd';
GRANT ALL PRIVILEGES ON *.* TO 'hiveuser'@'localhost' WITH GRANT OPTION;
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost/metastore</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hiveuser</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hivepwd</value>
</property>
<property>
<name>datanucleus.fixedDatastore</name>
<value>false</value>
</property>
<property>
<name>hive.exec.local.scratchdir</name>
<value>/tmp/hive</value>
<description>Local scratch space for Hive jobs</description>
</property>
<property>
<name>hive.downloaded.resources.dir</name>
<value>/tmp/hive</value>
<description>Temporary local directory for added resources in the remote file system.</description>
</property>
<property>
<name>hive.querylog.location</name>
<value>/tmp/hive</value>
<description>Location of Hive run time structured log file</description>
</property>
schematool -initSchema -dbType mysql
hdfs dfs -mkdir -p /user/hive/warehouse
hdfs dfs -chmod g+w /tmp
hdfs dfs -chmod g+w /user/hive/warehouse
hive
.Install hbase from home brew
brew install hbase
export PATH=/usr/local/Cellar/hbase/1.2.2/bin/:$PATH
alias hshell="hbase shell"
Edit the file /usr/local/Cellar/hbase/1.2.2/libexec/conf/hbase-site.xml
by adding the following property.
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9000/hbase</value>
</property>
</configuration>
Start Hbase server start-hbase.sh
and access via shell hbase shell
.
create a namespace and an empty table create_namespace test; create "test:testtable","field1"
.
Complete code can be found from my Github:BigData:ETL:Sqoop
MySql
Download driver class, in this case, jdbd driver mysql-connector-java-5.1.41-bin.jar. Copy the jar file to sqoop /lib/.
Create a test database test and a test table testtable, insert a couple of lines.
MySql to file ingestion, the following command will import the whole table from mySql to hdfs.
sqoop import \
--connect jdbc:mysql://localhost/test \
--username root \
--P \
--table testtable \
--target-dir ./testtable_table \
-m 1
MySql to file ingestion, importing by running SQL query inside Sqoop is quite often a more flexible way.
sqoop import \
--connect jdbc:mysql://localhost/test \
--username root \
--P \
--query "select * from testtable where \$CONDITIONS" \
--target-dir ./testtable_query \
-m 1
Hive
Create a hive table and import data from a CSV file. HQL script can be found from Github:BigDataETL:Hive.
-- drop table
drop table if exists ratings;
-- create table
create table ratings (
userID int,
movieID int,
rating int,
ts int)
row format delimited
fields terminated by ','
stored as textfile;
-- load data from file
load data local inpath './ml-latest-small/ratings.csv' overwrite into table ratings;
MySql to Hive ingestion, the following command will import a mySql table defined in the previous section into a Hive table.
sqoop import \
--connect jdbc:mysql://localhost/test \
--username root \
--P \
--table testtable \
--hive-import \
--hive-database test \
--create-hive-table \
--hive-table testtable \
-m 1
Hive to MySql ingestion, the following command will import a Hive table defined above into a mySql table for which table structure should be defined prior to the ingestion.
sqoop export \
--connect jdbc:mysql://localhost/test \
--username root \
--password pwd \
--table ratings \
--export-dir /user/hive/warehouse/test.db/ratings \
-m 1
Hbase
MySql to Hbase ingestion, the following command will import a mySql table defined above into a Hbase table.
sqoop import \
--bindir ./ \
--connect jdbc:mysql://localhost/test \
--username root \
--password pwd \
--table testtable \
--columns "column1, column2" \
--hbase-table testtable \
--column-family f1 \
--hbase-row-key column1 \
-m 1 -verbose
Complete code can be found from my Github:BigData:ETL:Flume
Save the following flume ingestion configuration file as for example example.conf
. The file defines an ingestion from twiiter firehose to a kafka topic which will constantly fetch data from Twitter and send it to a kafka topic.
a1.sources = twitterS
a1.channels = memoryC
a1.sinks = kafkaD
# source : exec
a1.sources.execS.type = exec
a1.sources.execS.command = tail -F /var/log/system.log
# source : twitter
a1.sources.twitterS.type = org.apache.flume.source.twitter.TwitterSource
a1.sources.twitterS.channels = memoryC
a1.sources.twitterS.consumerKey = ncMZ2CP7YmScHkLYwmfCYaTZz
a1.sources.twitterS.consumerSecret = ZkFEJXxXEOUlqkhrJ14kzWakrXjqIe11de7ks28DyC79P31t9q
a1.sources.twitterS.accessToken = 1157786504-XB3DXGrMmhvM1PAb6aeys3LJFYI9Y3LzS6veRHj
a1.sources.twitterS.accessTokenSecret = 8w69uDRm9PPA9iv3fNtkHPKP4FIq5SFtVbcE28wtcY5qx
a1.sources.twitterS.keywords = hadoop, kafka, spark, flume, storm, sqoop, yarn, mapr, mesos, hbase, hive, pig
a1.sources.twitterS.maxBatchSize = 10
a1.sources.twitterS.maxBatchDurationMillis = 200
# sink : kafka
a1.sinks.kafkaD.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaD.kafka.topic = flumetest
a1.sinks.kafkaD.kafka.bootstrap.servers = localhost:9092
a1.sinks.kafkaD.kafka.flumeBatchSize = 20
a1.sinks.kafkaD.kafka.producer.acks = 1
a1.sinks.kafkaD.kafka.producer.linger.ms = 1
a1.sinks.kafkaD.kafka.producer.compression.type = snappy
a1.channels.memoryC.type = memory
a1.channels.memoryC.capacity = 1000
a1.channels.memoryC.transactionCapacity = 100
a1.sources.execS.channels = memoryC
a1.sinks.kafkaD.channel = memoryC
Execute the flume ingestion and consume from kafka command line consumer
./flume-ng agent -n a1 -c conf -f ~/Codes/bigdata_etl/etl_flume/config/flume-example.conf -Dflume.root.looger=DEBUG,console
~/Codes/confluent-3.0.0/bin/kafka-console-consumer --zookeeper localhost:2181 --topic flumetest
Extraction and loading are part of ETL operation which are very often seen when building up the industry level big data processing pipeline. In this article, we walked through some ingestion operations mostly via Sqoop and Flume. These operations aim at transfering data between file systems e.g. HDFS, noSql databases e.g. Hbase, Sql databases e.g. Hive, message queue e.g. Kafka, and other sources or sinks.
Hongyu Su 01 March 2017 Helsinki