Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.7k views
in Technique[技术] by (71.8m points)

apache spark - What is the best possible way of interacting with Hbase using Pyspark

I am using pyspark [spark2.3.1] and Hbase1.2.1, I am wondering what could be the best possible way of accessing Hbase using pyspark?

I did some initial level of search and found that there are few options available like using shc-core:1.1.1-2.1-s_2.11.jar this could be achieved, but whereever I try to look for some example, at most of the places code is written in Scala or examples are also scala based. I tried implementing basic code in pyspark:

from pyspark import SparkContext
from pyspark.sql import SQLContext

def main():
    sc = SparkContext()
    sqlc = SQLContext(sc)
    data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"firsttable"},
        "rowkey":"key",
        "columns":{
            "firstcol":{"cf":"rowkey", "col":"key", "type":"string"},
            "secondcol":{"cf":"d", "col":"colname", "type":"string"}
        }
    }""".split())
    df = sqlc.read.options(catalog=catalog).format(data_source_format).load()
    df.select("secondcol").show()

# entry point for PySpark application
if __name__ == '__main__':
    main()

and running it using:

spark-submit  --master yarn-client --files /opt/hbase-1.1.2/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11  --jars /home/ubuntu/hbase-spark-2.0.0-alpha4.jar HbaseMain2.py

It is returning me blank output:

+---------+
|secondcol|
+---------+
+---------+

I am not sure what am I doing wrong? Also not sure what would be the best approach of doing this??

Any references would be appreciated.

Regards

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Finally, Using SHC, I am able to connect to HBase-1.2.1 with Spark-2.3.1 using pyspark code. Following is my work:

  • All my hadoop [namenode, datanode, nodemanager, resourcemanager] & hbase [Hmaster, HRegionServer, HQuorumPeer] deamons were up and running on my EC2 instance.

  • I placed emp.csv file at hdfs location /test/emp.csv, with data:

key,empId,empName,empWeight
1,"E007","Bhupesh",115.10
2,"E008","Chauhan",110.23
3,"E009","Prithvi",90.0
4,"E0010","Raj",80.0
5,"E0011","Chauhan",100.0
  • I created readwriteHBase.py file with following line of code [for reading emp.csv file from HDFS, then creating tblEmployee first in HBase, pushing the data into tblEmployee then once again reading some data from the same table and displaying it on console]:

    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession.builder.master("yarn-client").appName("HelloSpark").getOrCreate()
    
        dataSourceFormat = "org.apache.spark.sql.execution.datasources.hbase"
        writeCatalog = ''.join("""{
                    "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
                    "rowkey":"key",
                    "columns":{
                      "key":{"cf":"rowkey", "col":"key", "type":"int"},
                      "empId":{"cf":"personal","col":"empId","type":"string"},
                      "empName":{"cf":"personal", "col":"empName", "type":"string"},
                      "empWeight":{"cf":"personal", "col":"empWeight", "type":"double"}
                    }
                  }""".split())
    
        writeDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/test/emp.csv")
        print("csv file read", writeDF.show())
        writeDF.write.options(catalog=writeCatalog, newtable=5).format(dataSourceFormat).save()
        print("csv file written to HBase")
    
        readCatalog = ''.join("""{
                    "table":{"namespace":"default", "name":"tblEmployee"},
                    "rowkey":"key",
                    "columns":{
                      "key":{"cf":"rowkey", "col":"key", "type":"int"},
                      "empId":{"cf":"personal","col":"empId","type":"string"},
                      "empName":{"cf":"personal", "col":"empName", "type":"string"}
                    }
                  }""".split())
    
        print("going to read data from Hbase table")
        readDF = spark.read.options(catalog=readCatalog).format(dataSourceFormat).load()
        print("data read from HBase table")
        readDF.select("empId", "empName").show()
        readDF.show()
    
    # entry point for PySpark application
    if __name__ == '__main__':
        main()
    
  • Ran this script on VM console using command:

    spark-submit --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://nexus-private.hortonworks.com/nexus/content/repositories/IN-QA/ readwriteHBase.py
    
  • Intermediate Result: After reading CSV file:

    +---+-----+-------+---------+
    |key|empId|empName|empWeight|
    +---+-----+-------+---------+
    |  1| E007|Bhupesh|    115.1|
    |  2| E008|Chauhan|   110.23|
    |  3| E009|Prithvi|     90.0|
    |  4|E0010|    Raj|     80.0|
    |  5|E0011|Chauhan|    100.0|
    +---+-----+-------+---------+
    
  • Final Output : after reading data from HBase table:

    +-----+-------+
    |empId|empName|
    +-----+-------+
    | E007|Bhupesh|
    | E008|Chauhan|
    | E009|Prithvi|
    |E0010|    Raj|
    |E0011|Chauhan|
    +-----+-------+
    

Note: While creating Hbase table and inserting data into HBase table it expects NumberOfRegions should be greater than 3, hence I have added options(catalog=writeCatalog, newtable=5) while adding data to HBase


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...