Skip to content

Commit

Permalink
remove(metadatareader): topic strategy support
Browse files Browse the repository at this point in the history
This feature is planned to be put into a different PR.
  • Loading branch information
Attila Tóth committed Dec 20, 2021
1 parent 7459a8f commit a081323
Show file tree
Hide file tree
Showing 12 changed files with 27 additions and 906 deletions.
47 changes: 0 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,53 +329,6 @@ all of them.
complete backlog is read at once.</td>
</tr>

<tr>
<td>
`forwardStrategy`
</td>
<td>
`simple`, `large-first` or `proportional`
</td>
<td>`simple`</td>
<td>Streaming query</td>
<td>If `maxEntriesPerTrigger` is set, this parameter controls
which forwarding strategy is in use during the read of multiple
topics.
<li>
`simple` just divides the allowed number of entries equally
between all topics, regardless of their backlog size
</li>
<li>
`large-first` will load the largest topic backlogs first,
as the maximum number of allowed entries allows
</li>
<li>
`proportional` will forward all topics proportional to the
topic backlog/overall backlog ratio
</li>
</td>
</tr>

<tr>
<td>
`ensureEntriesPerTopic`
</td>
<td>Number to forward each topic with during a micro-batch.</td>
<td>0</td>
<td>Streaming query</td>
<td>If multiple topics are read, and the maximum number of
entries is also specified, always forward all topics with the
amount of entries specified here. Using this, users can ensure that topics
with considerably smaller backlogs than others are also forwarded
and read. Note that:
<li>If this number is higher than the maximum allowed entries divided
by the number of topics, then this value is taken into account, overriding
the maximum number of entries per micro-batch.
</li>
<li>This parameter has an effect only for forwarding strategies
`large-first` and `proportional`.</li>
</td>
</tr>
<tr>
<td>
`allowDifferentTopicSchemas`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,30 +225,14 @@ private[pulsar] case class PulsarMetadataReader(
}.toMap)
}


def forwardOffset(actualOffset: Map[String, MessageId],
strategy: String,
numberOfEntriesToForward: Long,
ensureEntriesPerTopic: Long): SpecificPulsarOffset = {
def fetchNextOffsetWithMaxEntries(actualOffset: Map[String, MessageId],
numberOfEntries: Long): SpecificPulsarOffset = {
getTopicPartitions()

// Collect internal stats for all topics
val topicStats = topicPartitions.map( topic => {
val internalStats = admin.topics().getInternalStats(topic)
val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest)
topic -> TopicState(internalStats,
PulsarSourceUtils.getLedgerId(topicActualMessageId),
PulsarSourceUtils.getEntryId(topicActualMessageId))
} ).toMap

val forwarder = strategy match {
case PulsarOptions.ProportionalForwardStrategy =>
new ProportionalForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
case PulsarOptions.LargeFirstForwardStrategy =>
new LargeFirstForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
case _ =>
new LinearForwardStrategy(numberOfEntriesToForward)
}
topic -> admin.topics().getInternalStats(topic)
} ).toMap.asJava

SpecificPulsarOffset(topicPartitions.map { topic =>
topic -> PulsarSourceUtils.seekableLatestMid {
Expand All @@ -262,39 +246,39 @@ private[pulsar] case class PulsarMetadataReader(
// Get the partition index
val partitionIndex = PulsarSourceUtils.getPartitionIndex(topicActualMessageId)
// Cache topic internal stats
val internalStats = topicStats.get(topic).get.internalStat
val internalStats = topicStats.get(topic)
// Calculate the amount of messages we will pull in
val numberOfEntriesPerTopic = forwarder.forward(topicStats)(topic)
// Get a future message ID which corresponds
// to the maximum number of messages
val numberOfEntriesPerTopic = numberOfEntries / topics.size
// Get a next message ID which respects
// the maximum number of messages
val (nextLedgerId, nextEntryId) = TopicInternalStatsUtils.forwardMessageId(
internalStats,
actualLedgerId,
actualEntryId,
numberOfEntriesPerTopic)
// Build a message id
val forwardedMessageId =
// Build the next message ID
val nextMessageId =
DefaultImplementation.newMessageId(nextLedgerId, nextEntryId, partitionIndex)
// Log state
val forwardedEntry = TopicInternalStatsUtils.numOfEntriesUntil(
val entryCountUntilNextMessageId = TopicInternalStatsUtils.numOfEntriesUntil(
internalStats, nextLedgerId, nextEntryId)
val entryCount = internalStats.numberOfEntries
val progress = f"${forwardedEntry.toFloat / entryCount.toFloat}%1.3f"
val logMessage = s"Pulsar Connector forward on topic. " +
s"[$numberOfEntriesPerTopic/$numberOfEntriesToForward]" +
val progress = f"${entryCountUntilNextMessageId.toFloat / entryCount.toFloat}%1.3f"
val logMessage = s"Pulsar Connector offset step forward. " +
s"[$numberOfEntriesPerTopic/$numberOfEntries]" +
s"${topic.reverse.take(30).reverse} $topicActualMessageId -> " +
s"$forwardedMessageId ($forwardedEntry/$entryCount) [$progress]"
s"$nextMessageId ($entryCountUntilNextMessageId/$entryCount) [$progress]"
log.debug(logMessage)
// Return the message ID
forwardedMessageId
nextMessageId
} catch {
case e: PulsarAdminException if e.getStatusCode == 404 =>
MessageId.earliest
case e: Throwable =>
throw new RuntimeException(
s"Failed to get forwarded messageId for ${TopicName.get(topic).toString} " +
s"(tried to forward ${forwarder.forward(topicStats)(topic)} messages " +
s"starting from `$topicActualMessageId` using strategy $strategy)", e)
s"(tried to forward ${numberOfEntries} messages " +
s"starting from `$topicActualMessageId`)", e)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ private[pulsar] object PulsarOptions {
val PartitionSuffix: String = TopicName.PARTITIONED_TOPIC_SUFFIX

val MaxEntriesPerTrigger = "maxentriespertrigger"
val EnsureEntriesPerTopic = "ensureentriespertopic"
val ForwardStrategy = "forwardstrategy"
val ProportionalForwardStrategy = "proportional"
val LargeFirstForwardStrategy = "large-first"

val TopicOptionKeys = Set(
TopicSingle,
Expand Down
10 changes: 1 addition & 9 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ private[pulsar] class PulsarProvider
failOnDataLoss(caseInsensitiveParams),
subscriptionNamePrefix,
jsonOptions,
maxEntriesPerTrigger(caseInsensitiveParams),
minEntriesPerTopic(caseInsensitiveParams),
forwardStrategy(caseInsensitiveParams)
maxEntriesPerTrigger(caseInsensitiveParams)
)
}

Expand Down Expand Up @@ -384,12 +382,6 @@ private[pulsar] object PulsarProvider extends Logging {
private def maxEntriesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
caseInsensitiveParams.getOrElse(MaxEntriesPerTrigger, "-1").toLong

private def minEntriesPerTopic(caseInsensitiveParams: Map[String, String]): Long =
caseInsensitiveParams.getOrElse(EnsureEntriesPerTopic, "0").toLong

private def forwardStrategy(caseInsensitiveParams: Map[String, String]): String =
caseInsensitiveParams.getOrElse(ForwardStrategy, "simple")

private def validateGeneralOptions(
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
if (!caseInsensitiveParams.contains(ServiceUrlOptionKey)) {
Expand Down
12 changes: 5 additions & 7 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ private[pulsar] class PulsarSource(
failOnDataLoss: Boolean,
subscriptionNamePrefix: String,
jsonOptions: JSONOptionsInRead,
maxEntriesPerTrigger: Long,
ensureEntriesPerTopic: Long,
forwardStrategy: String)
maxEntriesPerTrigger: Long)
extends Source
with Logging {

Expand Down Expand Up @@ -72,11 +70,11 @@ private[pulsar] class PulsarSource(
} else {
currentTopicOffsets match {
case Some(value) =>
metadataReader.forwardOffset(value,
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
metadataReader.fetchNextOffsetWithMaxEntries(value,
maxEntriesPerTrigger)
case _ =>
metadataReader.forwardOffset(initialTopicOffsets.topicOffsets,
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
metadataReader.fetchNextOffsetWithMaxEntries(initialTopicOffsets.topicOffsets,
maxEntriesPerTrigger)
}
}
logDebug(s"GetOffset: ${nextOffsets.topicOffsets.toSeq.map(_.toString).sorted}")
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit a081323

Please sign in to comment.