Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to configure camel.sink.marshal ? #1568

Open
tgib23 opened this issue Sep 27, 2023 · 0 comments
Open

How to configure camel.sink.marshal ? #1568

tgib23 opened this issue Sep 27, 2023 · 0 comments

Comments

@tgib23
Copy link

tgib23 commented Sep 27, 2023

I'm testing camel-azure-storage-blob-sink-kafka-connector from k8s.
So far, I've confirmed CamelHeader option works and the log is uploaded to arbitrary path.
Now I'm testing camel.sink.marshal option to save some usage on Azure Blob Storage, but haven't been successful yet.
I've followed some configuration like zipfile for camel.sink.marshal option, but faced the following errors.

2023-09-27 03:14:35,322 ERROR [blob-connector|task-0] WorkerSinkTask{id=blob-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.Work
erTask) [task-thread-blob-connector-0]
org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:159)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
        at org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66)
        at org.apache.camel.support.service.BaseService.doFail(BaseService.java:413)
        at org.apache.camel.impl.engine.AbstractCamelContext.doFail(AbstractCamelContext.java:3550)
        at org.apache.camel.support.service.BaseService.fail(BaseService.java:342)
        at org.apache.camel.impl.engine.AbstractCamelContext.failOnStartup(AbstractCamelContext.java:5204)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2642)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:111)
        at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2649)
        at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:262)
        at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:152)
        ... 9 more
Caused by: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:433)
        at org.apache.camel.impl.engine.AbstractCamelContext.doInit(AbstractCamelContext.java:2956)
        at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2630)
        ... 15 more
Caused by: org.apache.camel.component.kamelet.KameletNotFoundException: Kamelet with id ckcMarshal not found in locations: classpath:/kamelets
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:421)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:430)
        ... 18 more
Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route ckcMarshal-4 at: >>> Marshal[CustomDataFormat[{{marshal}}]] <<< in route: Route(ckcMarshal-4)[From[kamelet://source?routeId=ckcMarshal... because of Cannot find data format in registry with ref: zipfile
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:241)
        at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:75)
        at org.apache.camel.impl.DefaultModelReifierFactory.createRoute(DefaultModelReifierFactory.java:49)
        at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:862)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:416)
        ... 19 more
Caused by: java.lang.IllegalArgumentException: Cannot find data format in registry with ref: zipfile
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:142)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:115)
        at org.apache.camel.reifier.dataformat.CustomDataFormatReifier.doCreateDataFormat(CustomDataFormatReifier.java:35)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.createDataFormat(DataFormatReifier.java:266)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:151)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:111)
        at org.apache.camel.reifier.MarshalReifier.createProcessor(MarshalReifier.java:35)
        at org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:847)
        at org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:588)
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:237)
        ... 23 more
2023-09-27 03:14:35,323 INFO [blob-connector|task-0] Stopping CamelSinkTask connector task (org.apache.camel.kafkaconnector.CamelSinkTask) [task-thread-blob-connector-0]

I've also tested with org.apache.camel.model.dataformat.ZipFileDataFormat, then faced the following.

2023-09-27 03:13:40,704 ERROR [blob-connector|task-0] WorkerSinkTask{id=blob-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.Work
erTask) [task-thread-blob-connector-0]
org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:159)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
        at org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66)
        at org.apache.camel.support.service.BaseService.doFail(BaseService.java:413)
        at org.apache.camel.impl.engine.AbstractCamelContext.doFail(AbstractCamelContext.java:3550)
        at org.apache.camel.support.service.BaseService.fail(BaseService.java:342)
        at org.apache.camel.impl.engine.AbstractCamelContext.failOnStartup(AbstractCamelContext.java:5204)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2642)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:111)
        at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2649)
        at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:262)
        at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:152)
        ... 9 more
Caused by: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:433)
        at org.apache.camel.impl.engine.AbstractCamelContext.doInit(AbstractCamelContext.java:2956)
        at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2630)
        ... 15 more
Caused by: org.apache.camel.component.kamelet.KameletNotFoundException: Kamelet with id ckcMarshal not found in locations: classpath:/kamelets
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:421)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:430)
        ... 18 more
Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route ckcMarshal-1 at: >>> Marshal[CustomDataFormat[{{marshal}}]] <<< in route: Route(ckcMarshal-1)[From[kamelet://source?routeId=ckcMarshal... because of Resolving datafor
mat: org.apache.camel.model.dataformat.ZipFileDataFormat detected type conflict: Not a DataFormat implementation. Found: org.apache.camel.model.dataformat.ZipFileDataFormat
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:241)
        at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:75)
        at org.apache.camel.impl.DefaultModelReifierFactory.createRoute(DefaultModelReifierFactory.java:49)
        at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:862)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:416)
        ... 19 more
Caused by: java.lang.IllegalArgumentException: Resolving dataformat: org.apache.camel.model.dataformat.ZipFileDataFormat detected type conflict: Not a DataFormat implementation. Found: org.apache.camel.model.dataformat.ZipFileDataFormat
        at org.apache.camel.impl.engine.DefaultDataFormatResolver.createDataFormatFromResource(DefaultDataFormatResolver.java:76)
        at org.apache.camel.impl.engine.DefaultDataFormatResolver.createDataFormat(DefaultDataFormatResolver.java:47)
        at org.apache.camel.impl.engine.AbstractCamelContext.lambda$resolveDataFormat$3(AbstractCamelContext.java:4473)
        at java.base/java.util.Optional.orElseGet(Optional.java:364)
        at org.apache.camel.impl.engine.AbstractCamelContext.lambda$resolveDataFormat$4(AbstractCamelContext.java:4473)
        at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
        at org.apache.camel.impl.engine.AbstractCamelContext.resolveDataFormat(AbstractCamelContext.java:4464)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:140)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:115)
        at org.apache.camel.reifier.dataformat.CustomDataFormatReifier.doCreateDataFormat(CustomDataFormatReifier.java:35)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.createDataFormat(DataFormatReifier.java:266)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:151)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:111)
        at org.apache.camel.reifier.MarshalReifier.createProcessor(MarshalReifier.java:35)
        at org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:847)
        at org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:588)
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:237)
        ... 23 more

Can I have a bit more detailed explanation on how to specify this option?
My configuration is something like below so far.

spec:
  class: org.apache.camel.kafkaconnector.azurestorageblobsink.CamelAzurestorageblobsinkSinkConnector
  config:
    camel.kamelet.azure-storage-blob-sink.accountName: "my_account"
    camel.kamelet.azure-storage-blob-sink.accessKey: "xxx"
    camel.kamelet.azure-storage-blob-sink.containerName: "my_container"
    camel.beans.aggregate: "#class:org.apache.camel.kafkaconnector.aggregator.StringAggregator"
    camel.aggregation.size: 10
    camel.aggregation.timeout: 500000
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    topics: my-logs
    camel.sink.marshal: zipfile
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant