Vraag Spark Streaming Job kan niet worden hersteld


Ik gebruik een vonkstroomopdracht die mapWithState gebruikt met een initiële RDD. Bij het opnieuw starten van de toepassing en het herstellen van het controlepunt mislukt de fout:

Deze RDD mist een SparkContext. Het kan in de volgende gevallen gebeuren:

  1. RDD-transformaties en acties worden NIET aangeroepen door de bestuurder, maar binnen andere transformaties; rdd1.map (x => rdd2.values.count () * x) is bijvoorbeeld ongeldig omdat de waardentransformatie en telactie niet kunnen worden uitgevoerd binnen de rdd1.map-transformatie. Zie SPARK-5063 voor meer informatie.
  2. Wanneer een Spark Streaming-opdracht wordt hersteld vanaf het controlepunt, wordt deze uitzondering getroffen als een verwijzing naar een RDD die niet door de streaming-opdracht is gedefinieerd, wordt gebruikt in DStream-bewerkingen. Zie SPARK-13758 voor meer informatie

Dit gedrag wordt beschreven in https://issues.apache.org/jira/browse/SPARK-13758 maar het wordt niet echt beschreven hoe het op te lossen. Mijn RDD wordt niet gedefinieerd door de streaming-opdracht, maar ik heb deze nog steeds nodig in de staat.

Dit is een voorbeeld van hoe mijn grafiek eruit ziet:

class EventStreamingApplication {
  private val config: Config = ConfigFactory.load()
  private val sc: SparkContext = {
    val conf = new SparkConf()
      .setAppName(config.getString("streaming.appName"))
      .set("spark.cassandra.connection.host", config.getString("streaming.cassandra.host"))
    val sparkContext = new SparkContext(conf)
    System.setProperty("com.amazonaws.services.s3.enableV4", "true")
    sparkContext.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true")
    sparkContext
  }

  def run(): Unit = {
    // streaming.eventCheckpointDir is an S3 Bucket
    val ssc: StreamingContext = StreamingContext.getOrCreate(config.getString("streaming.eventCheckpointDir"), createStreamingContext)
    ssc.start()
    ssc.awaitTermination()
  }

  def receiver(ssc: StreamingContext): DStream[Event] = {
    RabbitMQUtils.createStream(ssc, Map(
      "hosts" -> config.getString("streaming.rabbitmq.host"),
      "virtualHost" -> config.getString("streaming.rabbitmq.virtualHost"),
      "userName" -> config.getString("streaming.rabbitmq.user"),
      "password" -> config.getString("streaming.rabbitmq.password"),
      "exchangeName" -> config.getString("streaming.rabbitmq.eventExchange"),
      "exchangeType" -> config.getString("streaming.rabbitmq.eventExchangeType"),
      "queueName" -> config.getString("streaming.rabbitmq.eventQueue")
    )).flatMap(EventParser.apply)
  }

  def setupStreams(ssc: StreamingContext): Unit = {
    val events = receiver(ssc)
    ExampleJob(events, sc)
  }

  private def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(config.getInt("streaming.batchSeconds")))
    setupStreams(ssc)
    ssc.checkpoint(config.getString("streaming.eventCheckpointDir"))
    ssc
  }
}

case class Aggregation(value: Long) // Contains aggregation values

object ExampleJob {
  def apply(events: DStream[Event], sc: SparkContext): Unit = {
    val aggregations: RDD[(String, Aggregation)] = sc.cassandraTable('...', '...').map(...) // some domain class mapping
    val state = StateSpec
      .function((key, value, state) => {
        val oldValue = state.getOption().map(_.value).getOrElse(0)
        val newValue = oldValue + value.getOrElse(0)
        state.update(Aggregation(newValue))
        state.get
      })
      .initialState(aggregations)
      .numPartitions(1)
      .timeout(Seconds(86400))
    events
      .filter(...) // filter out unnecessary events
      .map(...) // domain class mapping to key, event dstream
      .groupByKey()
      .map(i => (i._1, i._2.size.toLong))
      .mapWithState(state)
      .stateSnapshots()
      .foreachRDD(rdd => {
        rdd.saveToCassandra(...)
      })
  }
}

De gegooide stacktrack is:

Exception in thread "main" org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
  at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:534)
  at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193)
  at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
  at scala.Option.orElse(Option.scala:289)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
  at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
  at scala.Option.orElse(Option.scala:289)
  ...
  <991 lines omitted>
  ...
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
  at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
  at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
  at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
  at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571)
  at com.example.spark.EventStreamingApplication.run(EventStreamingApplication.scala:31)
  at com.example.spark.EventStreamingApplication$.main(EventStreamingApplication.scala:63)
  at com.example.spark.EventStreamingApplication.main(EventStreamingApplication.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:497)
  at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

11
2018-06-23 08:45


oorsprong


antwoorden:


Het lijkt erop dat terwijl de vonk probeert te herstellen, het laatste controlepuntbestand niet wordt gepickt. Vanwege deze onjuiste RDD's worden doorverwezen.

Het lijkt erop dat vonkversie 2.1.1 wordt beïnvloed, omdat deze zich niet in de lijst met vaste versies bevindt.

Raadpleeg onderstaande link voor apache documentatie waar fix release nog niet is gespecificeerd.

https://issues.apache.org/jira/browse/SPARK-19280

Naar mijn mening kunt u proberen de automatische / handmatige oplossing te verkennen, waar u het nieuwste controlepuntbestand kunt opgeven terwijl de vonkjob opnieuw wordt gestart.

Ik weet dat het niet veel helpt, maar ik dacht dat het beter is om je uit te leggen wat de oorzaak is voor dit probleem en de huidige ontwikkeling om het probleem op te lossen en mijn mening over een mogelijke oplossing.


0
2017-07-02 23:02