Skip to content

Commit

Permalink
[improve][fn] Optimize Function Worker startup by lazy loading and di…
Browse files Browse the repository at this point in the history
…rect zip/bytecode access (apache#22122)

(cherry picked from commit bbc6224)
(cherry picked from commit 597cfa1)

(cherry picked from commit d608bfc)

 Conflicts:
	pulsar-broker/pom.xml
	pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
	pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
	pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
	pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
	pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
	pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
	pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
	pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
	pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
	pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
	pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
	pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
	pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
	pulsar-functions/worker/pom.xml
	pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
	pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
	pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
	pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
	pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
	pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
	pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
	pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
  • Loading branch information
lhotari authored and nikhil-ctds committed Mar 5, 2024
1 parent c4ff7b9 commit 78f6677
Show file tree
Hide file tree
Showing 46 changed files with 4,650 additions and 5,219 deletions.
15 changes: 14 additions & 1 deletion conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ metadataStoreOperationTimeoutSeconds: 30
# Metadata store cache expiry time in seconds
metadataStoreCacheExpirySeconds: 300

# Specifies if the function worker should use classloading for validating submissions for built-in
# connectors and functions. This is required for validateConnectorConfig to take effect.
# Default is false.
enableClassloadingOfBuiltinFiles: false

# Specifies if the function worker should use classloading for validating submissions for external
# connectors and functions. This is required for validateConnectorConfig to take effect.
# Default is false.
enableClassloadingOfExternalFiles: false

################################
# Function package management
################################
Expand Down Expand Up @@ -390,7 +400,10 @@ saslJaasServerRoleTokenSignerSecretPath:
connectorsDirectory: ./connectors
functionsDirectory: ./functions

# Should connector config be validated during submission
# Enables extended validation for connector config with fine-grain annotation based validation
# during submission. Classloading with either enableClassloadingOfExternalFiles or
# enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect.
# Default is false.
validateConnectorConfig: false

# Whether to initialize distributed log metadata by runtime.
Expand Down
4 changes: 4 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,10 @@ The Apache Software License, Version 2.0
* Jodah
- net.jodah-typetools-0.5.0.jar
- net.jodah-failsafe-2.4.4.jar
* Byte Buddy
- net.bytebuddy-byte-buddy-1.14.12.jar
* zt-zip
- org.zeroturnaround-zt-zip-1.17.jar
* Apache Avro
- org.apache.avro-avro-1.10.2.jar
- org.apache.avro-avro-protobuf-1.10.2.jar
Expand Down
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ flexible messaging model and an intuitive client API.</description>
<puppycrawl.checkstyle.version>8.37</puppycrawl.checkstyle.version>
<dockerfile-maven.version>1.4.13</dockerfile-maven.version>
<typetools.version>0.5.0</typetools.version>
<byte-buddy.version>1.14.12</byte-buddy.version>
<zt-zip.version>1.17</zt-zip.version>
<protobuf3.version>3.19.6</protobuf3.version>
<protoc3.version>${protobuf3.version}</protoc3.version>
<grpc.version>1.45.1</grpc.version>
Expand Down Expand Up @@ -966,6 +968,18 @@ flexible messaging model and an intuitive client API.</description>
<version>${typetools.version}</version>
</dependency>

<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>${byte-buddy.version}</version>
</dependency>

<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-zip</artifactId>
<version>${zt-zip.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
Expand Down
29 changes: 29 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,24 @@
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-cassandra</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-io-cassandra.nar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-twitter</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-io-twitter.nar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-data-generator</artifactId>
Expand Down Expand Up @@ -483,6 +501,15 @@
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-functions-api-examples.nar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-api-examples</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-functions-api-examples.nar</destFileName>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand All @@ -498,6 +525,8 @@
<pulsar-functions-api-examples.jar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-functions-api-examples.jar.path>
<pulsar-functions-api-examples.nar.path>${project.build.directory}/pulsar-functions-api-examples.nar</pulsar-functions-api-examples.nar.path>
<pulsar-io-batch-data-generator.nar.path>${project.build.directory}/pulsar-io-batch-data-generator.nar</pulsar-io-batch-data-generator.nar.path>
<pulsar-io-cassandra.nar.path>${project.build.directory}/pulsar-io-cassandra.nar</pulsar-io-cassandra.nar.path>
<pulsar-io-twitter.nar.path>${project.build.directory}/pulsar-io-twitter.nar</pulsar-io-twitter.nar.path>
<!-- workaround issue #13750 which gets triggered if org.apache.bookkeeper.meta.MetadataDrivers class gets loaded before org.apache.pulsar.metadata.bookkeeper.BKCluster constructor is called -->
<bookkeeper.metadata.bookie.drivers>org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver</bookkeeper.metadata.bookie.drivers>
<bookkeeper.metadata.client.drivers>org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver</bookkeeper.metadata.client.drivers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ public NarClassLoader run() {
});
}

public static List<File> getClasspathFromArchive(File narPath, String narExtractionDirectory) throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
return getClassPathEntries(unpacked);
}

private static File getNarExtractionDirectory(String configuredDirectory) {
return new File(configuredDirectory + "/" + TMP_DIR_PREFIX);
}
Expand All @@ -164,16 +169,11 @@ private static File getNarExtractionDirectory(String configuredDirectory) {
* @param narWorkingDirectory
* directory to explode nar contents to
* @param parent
* @throws IllegalArgumentException
* if the NAR is missing the Java Services API file for <tt>FlowFileProcessor</tt> implementations.
* @throws ClassNotFoundException
* if any of the <tt>FlowFileProcessor</tt> implementations defined by the Java Services API cannot be
* loaded.
* @throws IOException
* if an error occurs while loading the NAR.
*/
private NarClassLoader(final File narWorkingDirectory, Set<String> additionalJars, ClassLoader parent)
throws ClassNotFoundException, IOException {
throws IOException {
super(new URL[0], parent);
this.narWorkingDirectory = narWorkingDirectory;

Expand Down Expand Up @@ -238,22 +238,31 @@ public List<String> getServiceImplementation(String serviceName) throws IOExcept
* if the URL list could not be updated.
*/
private void updateClasspath(File root) throws IOException {
addURL(root.toURI().toURL()); // for compiled classes, META-INF/, etc.
getClassPathEntries(root).forEach(f -> {
try {
addURL(f.toURI().toURL());
} catch (IOException e) {
log.error("Failed to add entry to classpath: {}", f, e);
}
});
}

static List<File> getClassPathEntries(File root) {
List<File> classPathEntries = new ArrayList<>();
classPathEntries.add(root);
File dependencies = new File(root, "META-INF/bundled-dependencies");
if (!dependencies.isDirectory()) {
log.warn("{} does not contain META-INF/bundled-dependencies!", narWorkingDirectory);
log.warn("{} does not contain META-INF/bundled-dependencies!", root);
}
addURL(dependencies.toURI().toURL());
classPathEntries.add(dependencies);
if (dependencies.isDirectory()) {
final File[] jarFiles = dependencies.listFiles(JAR_FILTER);
if (jarFiles != null) {
Arrays.sort(jarFiles, Comparator.comparing(File::getName));
for (File libJar : jarFiles) {
addURL(libJar.toURI().toURL());
}
classPathEntries.addAll(Arrays.asList(jarFiles));
}
}
return classPathEntries;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -114,16 +115,22 @@ static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnabl
* if the NAR could not be unpacked.
*/
private static void unpack(final File nar, final File workingDirectory) throws IOException {
try (JarFile jarFile = new JarFile(nar)) {
Enumeration<JarEntry> jarEntries = jarFile.entries();
while (jarEntries.hasMoreElements()) {
JarEntry jarEntry = jarEntries.nextElement();
String name = jarEntry.getName();
File f = new File(workingDirectory, name);
if (jarEntry.isDirectory()) {
Path workingDirectoryPath = workingDirectory.toPath().normalize();
try (ZipFile zipFile = new ZipFile(nar)) {
Enumeration<? extends ZipEntry> zipEntries = zipFile.entries();
while (zipEntries.hasMoreElements()) {
ZipEntry zipEntry = zipEntries.nextElement();
String name = zipEntry.getName();
Path targetFilePath = workingDirectoryPath.resolve(name).normalize();
if (!targetFilePath.startsWith(workingDirectoryPath)) {
log.error("Invalid zip file with entry '{}'", name);
throw new IOException("Invalid zip file. Aborting unpacking.");
}
File f = targetFilePath.toFile();
if (zipEntry.isDirectory()) {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f);
} else {
makeFile(jarFile.getInputStream(jarEntry), f);
makeFile(zipFile.getInputStream(zipEntry), f);
}
}
}
Expand Down
Loading

0 comments on commit 78f6677

Please sign in to comment.