join streaming and static dataframes does nothing

Multi tool use
Multi tool use
The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP


join streaming and static dataframes does nothing



So I'm trying to setup a stream from Kafka, and then add columns from a table to that dataframe based on a matching id in the kafka message and table.



I've got two separate files, the first one calls a method in streamKafka_ASH to setup the stream and then it prints out the contents.


val query = streamKafka_ASH.setupStream(spark, bootstrapservers,zkPath, topics, zkHosts, consumerGroup, batchIntervalSeconds.toInt, autoOffset, checkpointDir)
.writeStream
.outputMode("append")
.format("console")
.trigger(ProcessingTime(intervalMs = batchIntervalSeconds.toInt * 1000))
.start()
query.awaitTermination()



And here is the code in streamKafka_ASH.setupStream(...)


val marketingTrackingSchema = new StructType()
.add("id", StringType)
.add("device", StringType)
.add("type", StringType)
.add("mtaid", StringType)
.add("date", StringType)


val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("failOnDataLoss",false)
.option("group.id", consumerGroup)
.option("startingOffsets",offsetReset)
.option("checkpointLocation", checkpointDir)
.load()
.selectExpr("CAST(timestamp AS INT)","CAST(topic AS STRING)", "CAST(key AS STRING)","CAST(value AS STRING)").as[(Int, String, String, String)]

// get json from the kafka message and format it as "marketingTrackingSchema"
val df2 = mt.select(col("timestamp").alias("kafka_timestamp"),
col("topic"),
$"value",
from_json(col("value").cast("string"),marketingTrackingSchema).alias("data"))

// get the columns I need from the kafka message
val expandDF2 = df2.select(
col("kafka_timestamp"),
col("data.id"),
col("data.device"),
col("data.type"),
col("data.mtaid"),
col("data.date"),
)


// read in ash-Config from external hive table
val testExtHiveDF = spark.read.table("hbase_external.ash_config")

// get values I need from external hive table
val mtConfigDF = testExtHiveDF.selectExpr(
"trackingid as mtaid",
"relationshiptype",
"channel",
"subchannel"
)


expandDF2.join(mtConfigDF, "mtaid")
expandDF2.createOrReplaceTempView("test")

// this line is just to test if join worked, would normally return entire joined dataframe
val joinedDF2 = spark.sql("select channel from test")



I went through the debugger and after the join, the dataframes are the same and nothing changed. An error occurs at the joinedDF2 line:


Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`channel`' given input columns: [type, device, id, date, kafka_timestamp, mtaid]; line 1 pos 7;



Maybe I'm doing the join wrong or reading in the data wrong? I've been working on this for a while, and it seems simple, but I'm just not sure what to do.









By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

qCtUPS pCy5ovRE,S0lVUu7DGo8S8l9 IV
73qmenbGjMxHA3UR8p9I3D,QrE7g0GOqDG,K PS1ovDeBc,kkmojh V

Popular posts from this blog

Visual Studio Code: How to configure includePath for better IntelliSense results

Spring cloud config client Could not locate PropertySource

Makefile test if variable is not empty