Spark可以使用Spark Streaming来读取Kafka中的数据,并将数据写入到Hive中。
以下是使用Spark Streaming读取Kafka并将数据写入Hive的方法:
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaToHive")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "spark-streaming")
val topics = Set("topic1")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
kafkaStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val hiveContext = new HiveContext(rdd.sparkContext)
import hiveContext.implicits._
val dataFrame = rdd.map(_._2).toDF("value")
dataFrame.write.mode(SaveMode.Append).saveAsTable("hive_table")
}
}
在上面的代码中,我们首先创建了一个HiveContext来连接到Hive。然后,我们将RDD中的数据转换为DataFrame,并使用DataFrame的write方法将数据保存到Hive表中。
ssc.start()
ssc.awaitTermination()
这将启动Spark Streaming并等待它从Kafka读取数据并将其写入Hive。
请注意,您需要确保在Spark应用程序中正确配置Hive和Kafka的连接参数,并在Spark启动命令中添加相关的库和依赖项。
这是一个基本的示例,您可以根据自己的需求进行修改和扩展。
辰迅云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
推荐阅读: kafka如何查看最新数据