Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: parameters to control number and the distribution of messages in a micro-batch #63

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,28 @@ You can use `org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, Mess
This may cause a false alarm. You can set it to `false` when it doesn't work as you expected. <br>

A batch query always fails if it fails to read any data from the provided offsets due to data loss.</td>

</tr>
<tr>
<td>
`maxEntriesPerTrigger`
</td>
<td>
Number of entries to include in a single micro-batch during
streaming.
</td>
<td>-1</td>
<td>Streaming query</td>
<td>This parameter controls how many Pulsar entries are read by
the connector from the topic backlog at once. If the topic
backlog is considerably high, users can use this parameter
to limit the size of the micro-batch. If multiple topics are read,
this parameter controls the complete number of entries fetched from
all of them.

*Note:* Entries might contain multiple messages. The default value of `-1` means that the
complete backlog is read at once.</td>
</tr>

<tr>
<td>
`allowDifferentTopicSchemas`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import java.util.regex.Pattern
import org.apache.pulsar.client.admin.{PulsarAdmin, PulsarAdminException}
import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient}
import org.apache.pulsar.client.impl.schema.BytesSchema
import org.apache.pulsar.client.internal.DefaultImplementation
import org.apache.pulsar.common.naming.TopicName
import org.apache.pulsar.common.schema.SchemaInfo

import org.apache.spark.internal.Logging
import org.apache.spark.sql.pulsar.PulsarOptions._
import org.apache.spark.sql.pulsar.topicinternalstats.forward._
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -259,6 +261,68 @@ private[pulsar] case class PulsarMetadataReader(
}.toMap)
}

def fetchNextOffsetWithMaxEntries(actualOffset: Map[String, MessageId],
numberOfEntries: Long): SpecificPulsarOffset = {
getTopicPartitions()

// Collect internal stats for all topics
val topicStats = topicPartitions.map( topic => {
topic -> admin.topics().getInternalStats(topic)
} ).toMap.asJava

SpecificPulsarOffset(topicPartitions.map { topic =>
topic -> PulsarSourceUtils.seekableLatestMid {
// Fetch actual offset for topic
val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest)
try {
// Get the actual ledger
val actualLedgerId = PulsarSourceUtils.getLedgerId(topicActualMessageId)
// Get the actual entry ID
val actualEntryId = PulsarSourceUtils.getEntryId(topicActualMessageId)
// Get the partition index
val partitionIndex = PulsarSourceUtils.getPartitionIndex(topicActualMessageId)
// Cache topic internal stats
val internalStats = topicStats.get(topic)
// Calculate the amount of messages we will pull in
val numberOfEntriesPerTopic = numberOfEntries / topics.size
// Get a next message ID which respects
// the maximum number of messages
val (nextLedgerId, nextEntryId) = TopicInternalStatsUtils.forwardMessageId(
internalStats,
actualLedgerId,
actualEntryId,
numberOfEntriesPerTopic)
// Build the next message ID
val nextMessageId =
DefaultImplementation
.getDefaultImplementation
.newMessageId(nextLedgerId, nextEntryId, partitionIndex)
// Log state
val entryCountUntilNextMessageId = TopicInternalStatsUtils.numOfEntriesUntil(
internalStats, nextLedgerId, nextEntryId)
val entryCount = internalStats.numberOfEntries
val progress = f"${entryCountUntilNextMessageId.toFloat / entryCount.toFloat}%1.3f"
val logMessage = s"Pulsar Connector offset step forward. " +
s"[$numberOfEntriesPerTopic/$numberOfEntries]" +
s"${topic.reverse.take(30).reverse} $topicActualMessageId -> " +
s"$nextMessageId ($entryCountUntilNextMessageId/$entryCount) [$progress]"
log.debug(logMessage)
// Return the message ID
nextMessageId
} catch {
case e: PulsarAdminException if e.getStatusCode == 404 =>
MessageId.earliest
case e: Throwable =>
throw new RuntimeException(
s"Failed to get forwarded messageId for ${TopicName.get(topic).toString} " +
s"(tried to forward ${numberOfEntries} messages " +
s"starting from `$topicActualMessageId`)", e)
}

}
}.toMap)
}

def fetchLatestOffsetForTopic(topic: String): MessageId = {
val messageId =
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ private[pulsar] object PulsarOptions {

val TopicOptionKeys: Set[String] = Set(TopicSingle, TopicMulti, TopicPattern)

val MaxEntriesPerTrigger = "maxentriespertrigger"

val ServiceUrlOptionKey: String = "service.url"
val AdminUrlOptionKey: String = "admin.url"
val StartingOffsetsOptionKey: String = "startingOffsets".toLowerCase(Locale.ROOT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ private[pulsar] class PulsarProvider
pollTimeoutMs(caseInsensitiveParams),
failOnDataLoss(caseInsensitiveParams),
subscriptionNamePrefix,
jsonOptions)
jsonOptions,
maxEntriesPerTrigger(caseInsensitiveParams)
)
}

override def createRelation(
Expand Down Expand Up @@ -395,6 +397,9 @@ private[pulsar] object PulsarProvider extends Logging {
(SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000).toString)
.toInt

private def maxEntriesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
caseInsensitiveParams.getOrElse(MaxEntriesPerTrigger, "-1").toLong

private def validateGeneralOptions(
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
if (!caseInsensitiveParams.contains(ServiceUrlOptionKey)) {
Expand Down
26 changes: 17 additions & 9 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ private[pulsar] class PulsarSource(
pollTimeoutMs: Int,
failOnDataLoss: Boolean,
subscriptionNamePrefix: String,
jsonOptions: JSONOptionsInRead)
jsonOptions: JSONOptionsInRead,
maxEntriesPerTrigger: Long)
extends Source
with Logging {

Expand All @@ -59,12 +60,21 @@ private[pulsar] class PulsarSource(
override def schema(): StructType = SchemaUtils.pulsarSourceSchema(pulsarSchema)

override def getOffset: Option[Offset] = {
// Make sure initialTopicOffsets is initialized
initialTopicOffsets
val latest = metadataReader.fetchLatestOffsets()
currentTopicOffsets = Some(latest.topicOffsets)
logDebug(s"GetOffset: ${latest.topicOffsets.toSeq.map(_.toString).sorted}")
Some(latest.asInstanceOf[Offset])
val nextOffsets = if (maxEntriesPerTrigger == -1) {
metadataReader.fetchLatestOffsets()
} else {
currentTopicOffsets match {
case Some(value) =>
metadataReader.fetchNextOffsetWithMaxEntries(value,
maxEntriesPerTrigger)
case _ =>
metadataReader.fetchNextOffsetWithMaxEntries(initialTopicOffsets.topicOffsets,
maxEntriesPerTrigger)
}
}
logDebug(s"GetOffset: ${nextOffsets.topicOffsets.toSeq.map(_.toString).sorted}")
Some(nextOffsets.asInstanceOf[Offset])
}

override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
Expand All @@ -74,9 +84,7 @@ private[pulsar] class PulsarSource(
logInfo(s"getBatch called with start = $start, end = $end")
val endTopicOffsets = SpecificPulsarOffset.getTopicOffsets(end)

if (currentTopicOffsets.isEmpty) {
currentTopicOffsets = Some(endTopicOffsets)
}
currentTopicOffsets = Some(endTopicOffsets)

if (start.isDefined && start.get == end) {
return sqlContext.internalCreateDataFrame(
Expand Down
30 changes: 30 additions & 0 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,36 @@ private[pulsar] object PulsarSourceUtils extends Logging {
}
}

def getLedgerId(mid: MessageId): Long = {
mid match {
case bmid: BatchMessageIdImpl =>
bmid.getLedgerId
case midi: MessageIdImpl => midi.getLedgerId
case t: TopicMessageIdImpl => getLedgerId(t.getInnerMessageId)
case up: UserProvidedMessageId => up.getLedgerId
}
}

def getEntryId(mid: MessageId): Long = {
mid match {
case bmid: BatchMessageIdImpl =>
bmid.getEntryId
case midi: MessageIdImpl => midi.getEntryId
case t: TopicMessageIdImpl => getEntryId(t.getInnerMessageId)
case up: UserProvidedMessageId => up.getEntryId
}
}

def getPartitionIndex(mid: MessageId): Int = {
mid match {
case bmid: BatchMessageIdImpl =>
bmid.getPartitionIndex
case midi: MessageIdImpl => midi.getPartitionIndex
case t: TopicMessageIdImpl => getPartitionIndex(t.getInnerMessageId)
case up: UserProvidedMessageId => up.getPartitionIndex
}
}

def seekableLatestMid(mid: MessageId): MessageId = {
if (messageExists(mid)) mid else MessageId.earliest
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.pulsar.topicinternalstats.forward

import scala.collection.JavaConverters.asScalaBufferConverter

import org.apache.pulsar.common.policies.data.{ManagedLedgerInternalStats, PersistentTopicInternalStats}

import org.apache.spark.sql.pulsar.topicinternalstats.forward.TopicInternalStatsUtils._

object TopicInternalStatsUtils {

def forwardMessageId(stats: PersistentTopicInternalStats,
startLedgerId: Long,
startEntryId: Long,
forwardByEntryCount: Long): (Long, Long) = {
val ledgers = fixLastLedgerInInternalStat(stats).ledgers.asScala.toList
if (stats.ledgers.isEmpty || (forwardByEntryCount < 1)) {
// If there is no ledger info, or there is nothing to forward, stay at current ID
(startLedgerId, startEntryId)
} else {
// Find the start index in the list by its ledger ID
val startLedgerIndex: Int = stats.ledgers.asScala.find(_.ledgerId == startLedgerId) match {
// If found, start from there
case Some(index) => ledgers.indexWhere(_.ledgerId == startLedgerId)
// If it is not, but the value is -1, start from the beginning
case None if startLedgerId == -1 => 0
// In any other case, start from the end
case _ => ledgers.size - 1
}

// Clip the start entry ID withing th start ledger if needed
val startEntryIndex = Math.min(Math.max(startEntryId, 0), ledgers(startLedgerIndex).entries)

// Create an iterator over the ledgers list
val statsIterator =
new PersistentTopicInternalStatsIterator(stats, startLedgerIndex, startEntryIndex)

// Advance it forward with the amount of forward steps needed
val (forwardedLedgerId, forwardedEntryId) = (1L to forwardByEntryCount)
.map(_ => {statsIterator.next()}).last

(forwardedLedgerId, forwardedEntryId)
}
}

def numOfEntriesUntil(stats: PersistentTopicInternalStats,
ledgerId: Long,
entryId: Long): Long = {
val ledgers = fixLastLedgerInInternalStat(stats).ledgers.asScala
if (ledgers.isEmpty) {
0
} else {
val ledgersBeforeStartLedger = fixLastLedgerInInternalStat(stats).ledgers
.asScala
.filter(_.ledgerId < ledgerId)
val entriesInLastLedger = if (ledgersBeforeStartLedger.isEmpty) {
Math.max(entryId, 0)
} else {
Math.min(Math.max(entryId, 0), ledgersBeforeStartLedger.last.entries)
}
entriesInLastLedger + ledgersBeforeStartLedger.map(_.entries).sum
}
}

def numOfEntriesAfter(stats: PersistentTopicInternalStats,
ledgerId: Long,
entryId: Long): Long = {
val ledgers = fixLastLedgerInInternalStat(stats).ledgers.asScala
if (ledgers.isEmpty) {
0
} else {
val entryCountIncludingCurrentLedger = fixLastLedgerInInternalStat(stats).ledgers
.asScala
.filter(_.ledgerId >= ledgerId)
val entriesInFirstLedger = if (entryCountIncludingCurrentLedger.isEmpty) {
Math.max(entryId, 0)
} else {
Math.min(Math.max(entryId, 0), entryCountIncludingCurrentLedger.last.entries)
}
entryCountIncludingCurrentLedger.map(_.entries).sum - entriesInFirstLedger
}
}

def fixLastLedgerInInternalStat(
stats: PersistentTopicInternalStats): PersistentTopicInternalStats = {
if (stats.ledgers.isEmpty) {
stats
} else {
val lastLedgerInfo = stats.ledgers.get(stats.ledgers.size() - 1)
lastLedgerInfo.entries = stats.currentLedgerEntries
stats.ledgers.set(stats.ledgers.size() - 1, lastLedgerInfo)
stats
}
}

}

class PersistentTopicInternalStatsIterator(stats: PersistentTopicInternalStats,
startLedgerIndex: Int,
startEntryIndex: Long)
extends Iterator[(Long, Long)] {
val ledgers = fixLastLedgerInInternalStat(stats).ledgers.asScala.toList
private var currentLedgerIndex = startLedgerIndex
private var currentEntryIndex = startEntryIndex

override def hasNext: Boolean = !isLast
// If we are pointing to the last element
private def isLast: Boolean = currentLedgerIndex.equals(ledgers.size - 1) &&
currentEntryIndex.equals(ledgers.last.entries - 1)

override def next(): (Long, Long) = {
// Do not move past last element
if (hasNext) {
if (currentEntryIndex < (ledgers(currentLedgerIndex).entries - 1)) {
// Staying in the current ledger
currentEntryIndex += 1
} else {
// Advancing to the next ledger
currentLedgerIndex += 1
currentEntryIndex = 0
}
}
(ledgers(currentLedgerIndex).ledgerId, currentEntryIndex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.pulsar.topicinternalstats.forward

import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats

case class TopicState(internalStat: PersistentTopicInternalStats,
ledgerId: Long,
entryId: Long)
Loading