kerberos环境下spark消费kafka写入到Hbase

释放双眼,带上耳机,听听看~!

一、准备环境: 创建Kafka Topic和HBase表

1. 在kerberos环境下创建Kafka Topic

1.1 因为kafka默认使用的协议为PLAINTEXT,在kerberos环境下需要变更其通信协议: 在${KAFKA_HOME}/config/producer.propertiesconfig/consumer.properties下添加

security.protocol=SASL_PLAINTEXT

1.2 在执行前,需要在环境变量中添加KAFKA_OPT选项,否则kafka无法使用keytab:

export KAFKA_OPTS=\"$KAFKA_OPTS -Djava.security.auth.login.config=/usr/ndp/current/kafka_broker/conf/kafka_jaas.conf\"

其中kafka_jaas.conf内容如下:

cat /usr/ndp/current/kafka_broker/conf/kafka_jaas.conf

KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=\"/etc/security/keytabs/kafka.service.keytab\"
storeKey=true
useTicketCache=false
serviceName=\"kafka\"
principal=\"kafka/hzadg-mammut-platform3.server.163.org@BDMS.163.COM\";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName=\"kafka\";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=\"/etc/security/keytabs/kafka.service.keytab\"
storeKey=true
useTicketCache=false
serviceName=\"zookeeper\"
principal=\"kafka/hzadg-mammut-platform3.server.163.org@BDMS.163.COM\";
};

1.3 创建新的topic:

bin/kafka-topics.sh --create --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --replication-factor 1 --partitions 1 --topic spark-test

1.4 创建生产者:

bin/kafka-console-producer.sh  --broker-list hzadg-mammut-platform2.server.163.org:6667,hzadg-mammut-platform3.server.163.org:6667,hzadg-mammut-platform4.server.163.org:6667 --topic spark-test --producer.config ./config/producer.properties

1.5 测试消费者:

bin/kafka-console-consumer.sh --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --bootstrap-server hzadg-mammut-platform2.server.163.org:6667 --topic spark-test --from-beginning --new-consumer  --consumer.config ./config/consumer.properties

2. 创建HBase表

2.1 kinit到hbase账号,否则无法创建hbase表

kinit -kt /etc/security/keytabs/hbase.service.keytab hbase/hzadg-mammut-platform2.server.163.org@BDMS.163.COM
 ./bin/hbase shell
> create \'recsys_logs\', \'f\'

 

 

二、编写Spark代码

编写简单的Spark Java程序,功能为: 从Kafka消费信息,同时将batch内统计的数量写入Hbase中,具体可以参考项目:

https://github.com/LiShuMing/spark-demos

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the \"License\"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an \"AS IS\" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.netease.spark.streaming.hbase;

import com.netease.spark.utils.Consts;
import com.netease.spark.utils.JConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class JavaKafkaToHBaseKerberos {
  private final static Logger LOGGER = LoggerFactory.getLogger(JavaKafkaToHBaseKerberos.class);

  private static HConnection connection = null;
  private static HTableInterface table = null;

  public static void openHBase(String tablename) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    synchronized (HConnection.class) {
      if (connection == null)
        connection = HConnectionManager.createConnection(conf);
    }

    synchronized (HTableInterface.class) {
      if (table == null) {
        table = connection.getTable(\"recsys_logs\");
      }
    }
  }

  public static void closeHBase() {
    if (table != null)
      try {
        table.close();
      } catch (IOException e) {
        LOGGER.error(\"关闭 table 出错\", e);
      }
    if (connection != null)
      try {
        connection.close();
      } catch (IOException e) {
        LOGGER.error(\"关闭 connection 出错\", e);
      }
  }

  public static void main(String[] args) throws Exception {
    String hbaseTable = JConfig.getInstance().getProperty(Consts.HBASE_TABLE);
    String kafkaBrokers = JConfig.getInstance().getProperty(Consts.KAFKA_BROKERS);
    String kafkaTopics = JConfig.getInstance().getProperty(Consts.KAFKA_TOPICS);
    String kafkaGroup = JConfig.getInstance().getProperty(Consts.KAFKA_GROUP);

    // open hbase
    try {
      openHBase(hbaseTable);
    } catch (IOException e) {
      LOGGER.error(\"建立HBase 连接失败\", e);
      System.exit(-1);
    }

    SparkConf conf = new SparkConf().setAppName(\"JavaKafakaToHBase\");
    JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(kafkaTopics.split(\",\")));
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(\"bootstrap.servers\", kafkaBrokers);
    kafkaParams.put(\"key.deserializer\", StringDeserializer.class);
    kafkaParams.put(\"value.deserializer\", StringDeserializer.class);
    kafkaParams.put(\"group.id\", kafkaGroup);
    kafkaParams.put(\"auto.offset.reset\", \"earliest\");
    kafkaParams.put(\"enable.auto.commit\", false);
    // 在kerberos环境下,这个配置需要增加
    kafkaParams.put(\"security.protocol\", \"SASL_PLAINTEXT\");

    // Create direct kafka stream with brokers and topics
    final JavaInputDStream<ConsumerRecord<String, String>> stream =
        KafkaUtils.createDirectStream(
            ssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topicsSet.toArray(new String[0])), kafkaParams)
        );

    JavaDStream<String> lines = stream.map(new Function<ConsumerRecord<String, String>, String>() {
      private static final long serialVersionUID = -1801798365843350169L;

      @Override
      public String call(ConsumerRecord<String, String> record) {
        return record.value();
      }
    }).filter(new Function<String, Boolean>() {
      private static final long serialVersionUID = 7786877762996470593L;

      @Override
      public Boolean call(String msg) throws Exception {
        return msg.length() > 0;
      }
    });

    JavaDStream<Long> nums = lines.count();

    nums.foreachRDD(new VoidFunction<JavaRDD<Long>>() {
      private SimpleDateFormat sdf = new SimpleDateFormat(\"yyyyMMdd HH:mm:ss\");

      @Override
      public void call(JavaRDD<Long> rdd) throws Exception {
        Long num = rdd.take(1).get(0);
        String ts = sdf.format(new Date());
        Put put = new Put(Bytes.toBytes(ts));
        put.add(Bytes.toBytes(\"f\"), Bytes.toBytes(\"nums\"), Bytes.toBytes(num));
        table.put(put);
      }
    });

    ssc.start();
    ssc.awaitTermination();
    closeHBase();
  }
}

 

三、 编译并在Yarn环境下运行

3.1 切到项目路径下,编译项目:

mvn clean package

 

3.2 运行Spark环境

  • 由于executor需要访问kafka,所以需要将Kafka授权过的kerberos用户下发至executor中;
  • 由于集群环境的hdfs也是kerberos加密的,需要通过spark.yarn.keytab/spark.yarn.principal配置可以访问Hdfs/HBase的keytab信息;

在项目目录下执行如下:

/usr/ndp/current/spark2_client/bin/spark-submit \\
--files ./kafka_client_jaas.conf,./kafka.service.keytab \\
--conf \"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf\" \\
--driver-java-options \"-Djava.security.auth.login.config=./kafka_client_jaas.conf\" \\
--conf spark.yarn.keytab=/etc/security/keytabs/hbase.service.keytab \\
--conf spark.yarn.principal=hbase/hzadg-mammut-platform1.server.163.org@BDMS.163.COM \\
--class com.netease.spark.streaming.hbase.JavaKafkaToHBaseKerberos \\
--master yarn  \\
--deploy-mode client \\
./target/spark-demo-0.1.0-jar-with-dependencies.jar  

其中kafka_client_jaas.conf文件具体内容如下:

cat kafka_client_jaas.conf

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
renewTicket=true
keyTab=\"./kafka.service.keytab\"
storeKey=true
useTicketCache=false
serviceName=\"kafka\"
principal=\"kafka/hzadg-mammut-platform1.server.163.org@BDMS.163.COM\";
};

 

3.2 执行结果

 
 
 

人已赞赏
随笔日记

两篇文章带你走入.NET Core 世界:Kestrel+Nginx+Supervisor 部署上云服务器(二),两篇文章带你走入.NET Core 世界:CentOS+Kestrel+Ngnix 虚拟机先走一遍(一)

2020-11-9 3:52:45

随笔日记

Elasticsearch大规模时序索引如何治理和规划

2020-11-9 3:52:47

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索