amazon-kinesis-client-python: Records not being processed with sample app - simple print(data) not working

I’ve gone through several issues and SO questions and haven’t been able to get the sample app to work.

Here’s how to reproduce this;

# Run dynamodb
docker run -d -p 8000:8000 dwmkerr/dynamodb

# Create java8 container
docker run -it jkosgei/alpine-java8 ash # Source at https://github.com/jonathan-kosgei/alpine-java8

export JAVA_HOME=/usr/lib/jvm/java-1.8-openjdk/jre
export AWS_ACCESS_KEY_ID=
export AWS_SECRET_ACCESS_KEY=

apk add --no-cache python3 curl git
git clone https://github.com/awslabs/amazon-kinesis-client-python.git
cd amazon-kinesis-client-python

# edit sample.properties with sample below

# Start sample producer
sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster

# Do setup
python3 setup.py download_jars &&\
python3 setup.py install

# Run sample app
`amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties prod.properties`

The following are the only things I’ve changed in my prod.properties

executableName = python3 /amazon-kinesis-client-python/samples/sample_kclpy_app.py
dynamoDBEndpoint = http://127.0.0.1:8000
applicationName = test
maxRecords = 10000
idleTimeBetweenReadsInMillis = 200
callProcessRecordsEvenForEmptyRecordList = true

In the sample_kclpy_app.py my process_record function looks like this;

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        print('Data: ', data)
        with open('/log.txt', 'a') as the_file:
        	the_file.write(data)
        return

My logs

/amazon-kinesis-client-python # `amazon_kclpy_helper.py --print_command --java /
usr/bin/java --properties prod.properties`
Mar 07, 2018 11:36:26 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator getConfiguration
INFO: Value of workerId is not provided in the properties. WorkerId is automatically assigned as: 13bdbb57-e701-4be1-b2ca-6b808fa95b73
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property regionName with value us-east-1
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property idleTimeBetweenReadsInMillis with value 200
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property maxRecords with value 10000
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property callProcessRecordsEvenForEmptyRecordList with value true
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property dynamoDBEndpoint with value http://127.0.0.1:8000
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property initialPositionInStream with value TRIM_HORIZON
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig buildExecutorService
INFO: Using a cached thread pool.
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig <init>
INFO: Running test to process stream words with executable python3 
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using workerId: bleh
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using credentials with access key id: bleh
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: MultiLangDaemon is adding the following fields to the User Agent: amazon-kinesis-client-library-java-1.9.0 amazon-kinesis-multi-lang-daemon/1.0.1 python/3.6 python3
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator <init>
INFO: With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initialization attempt 1
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initializing LeaseCoordinator
Mar 07, 2018 11:36:37 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator initialize
INFO: Created new lease table for coordinator with initial read capacity of 10 and write capacity of 10.
Mar 07, 2018 11:36:41 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Syncing Kinesis shard info
Mar 07, 2018 11:36:43 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Starting LeaseCoordinator
Mar 07, 2018 11:36:46 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
INFO: Worker 13bdbb57-e701-4be1-b2ca-6b808fa95b73 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
Mar 07, 2018 11:36:48 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker takeLeases
INFO: Worker 13bdbb57-e701-4be1-b2ca-6b808fa95b73 successfully took 1 leases: shardId-000000000000
Mar 07, 2018 11:36:56 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker run
INFO: Initialization complete. Starting worker loop.
Mar 07, 2018 11:36:58 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker infoForce
INFO: Created new shardConsumer for : ShardInfo [shardId=shardId-000000000000, concurrencyToken=bleh, parentShardIds=[], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}]
Mar 07, 2018 11:36:58 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
INFO: No need to block on parents [] of shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
INFO: Initializing shard shardId-000000000000 with TRIM_HORIZON
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing InitializeMessage to child process for shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading STDERR for shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 110 bytes for shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000
Mar 07, 2018 11:37:29 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:37:29 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:38:39 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:38:39 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:39:42 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:39:42 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:40:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:40:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:41:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:41:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...

About this issue

  • Original URL
  • State: open
  • Created 6 years ago
  • Reactions: 2
  • Comments: 31 (9 by maintainers)

Most upvoted comments

Yes that seemed odd, I’m using the code https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample_kclpy_app.py I haven’t made any changes except to add print(data) to process_record in that file.

I encountered a similar issue here like @jonathan-kosgei mentioned as simple print won’t show up in the logs. I noticed that the build folder isn’t reflecting and I ran python setup.py install command. After this is done changes were reflected in the build folder and my changes on the sample_kclpy_app.py were reflected on the logs. Also noticed that sample_kclpy_app.py is getting copied into /Library/Frameworks/Python.framework/Versions/3.7/bin/sample_kclpy_app.py the when im performing the install command. This might me the reason for changes not getting reflected. I could be completely in different in a page. But hope it helps.

It looks like it’s getting stuck waiting on the response from the initialize method:

20:42:27.998 [multi-lang-daemon-0004] INFO com.amazonaws.services.kinesis.multilang.LineReaderTask - Starting: Reading next message from STDIN for shardId-000000000000 20:42:28.085 [multi-lang-daemon-0000] DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer - Previous INITIALIZE task still pending for shard shardId-000000000000 since 1005 ms ago. Not submitting new task. 20:42:28.285 [multi-lang-daemon-0000] DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer - Previous INITIALIZE task still pending for shard shardId-000000000000 since 1205 ms ago. Not submitting new task. 20:42:28.486 [multi-lang-daemon-0000] DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer - Previous INITIALIZE task still pending for shard shardId-000000000000 since 1406 ms ago. Not submitting new task.

Is there something happening in the initialize(...) method that could be preventing the method from returning?