aws-msk-iam-auth: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type:software.amazon.msk.auth.iam.internals.AWSCredentialsCallback]
Hello,
Using the library aws-msk-iam-auth to authenticate Zeppelin and Flink processes in an MSK cluster with IAM authentication enabled, following error is being thrown when trying to run more than one task at the same TaskManager:
Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type:software.amazon.msk.auth.iam.internals.AWSCredentialsCallback]) occurred when evaluating SASL token received from the Kafka Broker
Library version 1.1.0 has been imported from S3 bucket to Studio Notebook and added as a dependency. Following configuration is being used:
"bootstrap.servers" -> "<MSKBootstrapServers>",
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "AWS_MSK_IAM",
"sasl.jaas.config" -> "software.amazon.msk.auth.iam.IAMLoginModule required;",
"sasl.client.callback.handler.class" -> "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
As the default artifact included in the notebook (flink-sql-connector-kafka_2.12) shades Kafka client dependencies, following Gradle was used to also shade dependencies within aws-msk-iam-auth:
apply plugin: 'com.github.johnrengelman.shadow'
ext {
projectName = "aws-msk-iam-auth-flink-shaded"
projectDescription = "Shades Kafka client dependencies in aws-msk-iam-auth library so that they can match Kafka Client dependency in the shaded flink-kafka-connector"
projectArtifactName = "aws-msk-iam-auth-flink-shaded"
}
dependencies {
implementation group: 'software.amazon.msk', name: 'aws-msk-iam-auth', version: '1.1.0'
}
jar {
enabled = false
dependsOn(shadowJar)
}
shadowJar {
archiveClassifier.set('')
archiveName("${projectArtifactName}.jar")
relocate('org.apache.kafka', 'org.apache.flink.kafka.shaded.org.apache.kafka')
mergeServiceFiles()
}
The setup works and authenticates fine with MSK when a single task is being executed by TaskManager. However, as mentioned before, when trying to run more than one task in the same manager, following exception is thrown:
java.io.IOException: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b8fc1c328b567c9aab180e1c7af33983)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b8fc1c328b567c9aab180e1c7af33983)
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719)
at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
... 14 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b8fc1c328b567c9aab180e1c7af33983)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:116)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
... 3 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:114)
... 18 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:198)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:191)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:185)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:520)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:394)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type:software.amazon.msk.auth.iam.internals.AWSCredentialsCallback]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: Exception while evaluating challenge
at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:113)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:474)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:474)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:381)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:293)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:233)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:368)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1926)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1894)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:553)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:474)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:470)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:529)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:724)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:549)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type:software.amazon.msk.auth.iam.internals.AWSCredentialsCallback
at software.amazon.msk.auth.iam.IAMClientCallbackHandler.handle(IAMClientCallbackHandler.java:80)
at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:138)
at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:95)
... 30 more
Could you please help with the issue?
Steps to reproduce:
- Create studio notebook with parallelism of 5 and parallelism per KPU also 5.
- Add required shaded dependencies as described above.
- Make notebook process some data with the Flink Streaming SQL.
- Run another task from the notebook.
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 23 (10 by maintainers)
Commits related to this issue
- [https://github.com/aws/aws-msk-iam-auth/issues/36] Adding a proxy to delegate to appropriate IAMSaslClientProvider based on ClassLoader — committed to dannycranmer/aws-msk-iam-auth by dannycranmer 2 years ago
- [https://github.com/aws/aws-msk-iam-auth/issues/36] Adding a proxy to delegate to appropriate IAMSaslClientProvider based on ClassLoader — committed to dannycranmer/aws-msk-iam-auth by dannycranmer 2 years ago
- [https://github.com/aws/aws-msk-iam-auth/issues/36] Adding a proxy to delegate to appropriate IAMSaslClientProvider based on ClassLoader — committed to dannycranmer/aws-msk-iam-auth by dannycranmer 2 years ago
- [https://github.com/aws/aws-msk-iam-auth/issues/36] Adding a proxy to delegate to appropriate IAMSaslClientProvider based on ClassLoader — committed to dannycranmer/aws-msk-iam-auth by dannycranmer 2 years ago
- [https://github.com/aws/aws-msk-iam-auth/issues/36] Adding a proxy to delegate to appropriate IAMSaslClientProvider based on ClassLoader (#82) — committed to aws/aws-msk-iam-auth by dannycranmer 2 years ago
- [https://github.com/aws/aws-msk-iam-auth/issues/36] Update README for multi classloader support — committed to dannycranmer/aws-msk-iam-auth by dannycranmer 2 years ago
- [https://github.com/aws/aws-msk-iam-auth/issues/36] Update README for multi classloader support (#86) — committed to aws/aws-msk-iam-auth by dannycranmer 2 years ago
Hello @lauroawsps, we have now addressed this issue for KDA Studio. We have added support for the non-relocated Kafka connector and MSK IAM as service managed dependencies. This means you will no longer need to rebuild and provide the connector. This change is not yet available on the console, if you would like to test it out now you can update your application using the CLI. Here is an example request (please note that you will need to provide the entire list of custom artifacts here. Any additional Jars in S3 you want to include should be added to the list (reference)):
Hello @lauroawsps , just to confirm that we (Kinesis Data Analytics) also ran into this issue and are currently working towards a fix. I will keep you posted. Sorry for the inconvenience.
No I am referring to two parallel Flink jobs here. However, the same problem applies to a single job stop/start (failover).
Apologies I cannot provide an ETA at this point.
Some supplementary information:
The block diagram below illustrates the problem:
We have confirmed the issue also exists for other Kafka plugins, for example, Scram. However when using Scram the majority of callbacks are usable because they are also loaded from parent classloader (since the package prefix is
javax):javax.security.auth.callback.NameCallbackjavax.security.auth.callback.PasswordCallbackIf you try to define any extensions via
org.apache.kafka.common.security.scram.ScramExtensionsCallbackthen it is rejected for the same reason as MSK IAM. The Kafka client simply swallows the exception with a debug message:@dannycranmer Reopened as requested.