一、准备环境: 创建Kafka Topic和HBase表
1. 在kerberos环境下创建Kafka Topic
1.1 因为kafka默认使用的协议为PLAINTEXT,在kerberos环境下需要变更其通信协议: 在${KAFKA_HOME}/config/producer.properties
和config/consumer.properties
下添加
security.protocol=SASL_PLAINTEXT
1.2 在执行前,需要在环境变量中添加KAFKA_OPT选项,否则kafka无法使用keytab:
export KAFKA_OPTS=\"$KAFKA_OPTS -Djava.security.auth.login.config=/usr/ndp/current/kafka_broker/conf/kafka_jaas.conf\"
其中kafka_jaas.conf
内容如下:
cat /usr/ndp/current/kafka_broker/conf/kafka_jaas.conf KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab=\"/etc/security/keytabs/kafka.service.keytab\" storeKey=true useTicketCache=false serviceName=\"kafka\" principal=\"kafka/hzadg-mammut-platform3.server.163.org@BDMS.163.COM\"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName=\"kafka\"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab=\"/etc/security/keytabs/kafka.service.keytab\" storeKey=true useTicketCache=false serviceName=\"zookeeper\" principal=\"kafka/hzadg-mammut-platform3.server.163.org@BDMS.163.COM\"; };
1.3 创建新的topic:
bin/kafka-topics.sh --create --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --replication-factor 1 --partitions 1 --topic spark-test
1.4 创建生产者:
bin/kafka-console-producer.sh --broker-list hzadg-mammut-platform2.server.163.org:6667,hzadg-mammut-platform3.server.163.org:6667,hzadg-mammut-platform4.server.163.org:6667 --topic spark-test --producer.config ./config/producer.properties
1.5 测试消费者:
bin/kafka-console-consumer.sh --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --bootstrap-server hzadg-mammut-platform2.server.163.org:6667 --topic spark-test --from-beginning --new-consumer --consumer.config ./config/consumer.properties
2. 创建HBase表
2.1 kinit到hbase账号,否则无法创建hbase表
kinit -kt /etc/security/keytabs/hbase.service.keytab hbase/hzadg-mammut-platform2.server.163.org@BDMS.163.COM
./bin/hbase shell
> create \'recsys_logs\', \'f\'
二、编写Spark代码
编写简单的Spark Java程序,功能为: 从Kafka消费信息,同时将batch内统计的数量写入Hbase中,具体可以参考项目:
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the \"License\"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an \"AS IS\" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.netease.spark.streaming.hbase; import com.netease.spark.utils.Consts; import com.netease.spark.utils.JConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; public class JavaKafkaToHBaseKerberos { private final static Logger LOGGER = LoggerFactory.getLogger(JavaKafkaToHBaseKerberos.class); private static HConnection connection = null; private static HTableInterface table = null; public static void openHBase(String tablename) throws IOException { Configuration conf = HBaseConfiguration.create(); synchronized (HConnection.class) { if (connection == null) connection = HConnectionManager.createConnection(conf); } synchronized (HTableInterface.class) { if (table == null) { table = connection.getTable(\"recsys_logs\"); } } } public static void closeHBase() { if (table != null) try { table.close(); } catch (IOException e) { LOGGER.error(\"关闭 table 出错\", e); } if (connection != null) try { connection.close(); } catch (IOException e) { LOGGER.error(\"关闭 connection 出错\", e); } } public static void main(String[] args) throws Exception { String hbaseTable = JConfig.getInstance().getProperty(Consts.HBASE_TABLE); String kafkaBrokers = JConfig.getInstance().getProperty(Consts.KAFKA_BROKERS); String kafkaTopics = JConfig.getInstance().getProperty(Consts.KAFKA_TOPICS); String kafkaGroup = JConfig.getInstance().getProperty(Consts.KAFKA_GROUP); // open hbase try { openHBase(hbaseTable); } catch (IOException e) { LOGGER.error(\"建立HBase 连接失败\", e); System.exit(-1); } SparkConf conf = new SparkConf().setAppName(\"JavaKafakaToHBase\"); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); Set<String> topicsSet = new HashSet<>(Arrays.asList(kafkaTopics.split(\",\"))); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put(\"bootstrap.servers\", kafkaBrokers); kafkaParams.put(\"key.deserializer\", StringDeserializer.class); kafkaParams.put(\"value.deserializer\", StringDeserializer.class); kafkaParams.put(\"group.id\", kafkaGroup); kafkaParams.put(\"auto.offset.reset\", \"earliest\"); kafkaParams.put(\"enable.auto.commit\", false); // 在kerberos环境下,这个配置需要增加 kafkaParams.put(\"security.protocol\", \"SASL_PLAINTEXT\"); // Create direct kafka stream with brokers and topics final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topicsSet.toArray(new String[0])), kafkaParams) ); JavaDStream<String> lines = stream.map(new Function<ConsumerRecord<String, String>, String>() { private static final long serialVersionUID = -1801798365843350169L; @Override public String call(ConsumerRecord<String, String> record) { return record.value(); } }).filter(new Function<String, Boolean>() { private static final long serialVersionUID = 7786877762996470593L; @Override public Boolean call(String msg) throws Exception { return msg.length() > 0; } }); JavaDStream<Long> nums = lines.count(); nums.foreachRDD(new VoidFunction<JavaRDD<Long>>() { private SimpleDateFormat sdf = new SimpleDateFormat(\"yyyyMMdd HH:mm:ss\"); @Override public void call(JavaRDD<Long> rdd) throws Exception { Long num = rdd.take(1).get(0); String ts = sdf.format(new Date()); Put put = new Put(Bytes.toBytes(ts)); put.add(Bytes.toBytes(\"f\"), Bytes.toBytes(\"nums\"), Bytes.toBytes(num)); table.put(put); } }); ssc.start(); ssc.awaitTermination(); closeHBase(); } }
三、 编译并在Yarn环境下运行
3.1 切到项目路径下,编译项目:
mvn clean package
3.2 运行Spark环境
- 由于executor需要访问kafka,所以需要将Kafka授权过的kerberos用户下发至executor中;
- 由于集群环境的hdfs也是kerberos加密的,需要通过spark.yarn.keytab/spark.yarn.principal配置可以访问Hdfs/HBase的keytab信息;
在项目目录下执行如下:
/usr/ndp/current/spark2_client/bin/spark-submit \\ --files ./kafka_client_jaas.conf,./kafka.service.keytab \\ --conf \"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf\" \\ --driver-java-options \"-Djava.security.auth.login.config=./kafka_client_jaas.conf\" \\ --conf spark.yarn.keytab=/etc/security/keytabs/hbase.service.keytab \\ --conf spark.yarn.principal=hbase/hzadg-mammut-platform1.server.163.org@BDMS.163.COM \\ --class com.netease.spark.streaming.hbase.JavaKafkaToHBaseKerberos \\ --master yarn \\ --deploy-mode client \\ ./target/spark-demo-0.1.0-jar-with-dependencies.jar
其中kafka_client_jaas.conf
文件具体内容如下:
cat kafka_client_jaas.conf KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true renewTicket=true keyTab=\"./kafka.service.keytab\" storeKey=true useTicketCache=false serviceName=\"kafka\" principal=\"kafka/hzadg-mammut-platform1.server.163.org@BDMS.163.COM\"; };
3.2 执行结果