pgsync: First sync was successful but the further updates doesn't be synced

PGSync version: 2.1.9

Postgres version: 11.12 (AWS RDS)

Elasticsearch version: 7.10.2

Redis version: 6.2.6

Python version: 3.9.9

Problem Description: I’m using the latest pypi pgsync in docker env. First indexing was successful but the following sync doesn’t look to work.

opensearch-pgsync  | Syncing kyuluxmain Db: [6,972] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [6,978] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [6,984] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,012] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,018] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,024] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,030] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,036] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,066] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,072] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,108] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,114] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,120] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,126] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,132] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,150] => Redis: [0] => Elastic: [0] ...
opensearch-pgsync  | Syncing kyuluxmain Db: [7,152] => Redis: [0] => Elastic: [0] ...

I checked kibana and count the number of records in the index but it has not been changed.

Here is my env vars:

    environment:
      - LOG_LEVEL=INFO
      - QUERY_CHUNK_SIZE=1000
      - POLL_TIMEOUT=1
      - ELASTICSEARCH_SCHEME=https
      - ELASTICSEARCH_HOST=******
      - ELASTICSEARCH_PORT=9200
      - ELASTICSEARCH_USER=*****
      - ELASTICSEARCH_PASSWORD=*****
      - ELASTICSEARCH_TIMEOUT=100
      - ELASTICSEARCH_CHUNK_SIZE=100
      - ELASTICSEARCH_VERIFY_CERTS=false
      - ELASTICSEARCH_USE_SSL=true
      - ELASTICSEARCH_SSL_SHOW_WARN=false
      - ELASTICSEARCH_STREAMING_BULK=true
      - ELASTICSEARCH_MAX_RETRIES=10
      - PG_HOST=*****
      - PG_PORT=5432
      - PG_USER=*****
      - PG_PASSWORD=*****
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - REDIS_AUTH=*****

Is there anything I can check if the sync is working or not?

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Comments: 21 (8 by maintainers)

Most upvoted comments

Same issue here, run pgsync will sync initial data successfully, but first data update in Db show this error:

2021-12-22 05:03:46.439:ERROR:pgsync.sync: Exception 'NoneType' object is not subscriptable
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 875, in sync
    self.es.bulk(self.index, docs)
  File "/usr/local/lib/python3.8/dist-packages/pgsync/elastichelper.py", line 124, in bulk
    for _ in helpers.parallel_bulk(
  File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 472, in parallel_bulk
    for result in pool.imap(
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 868, in next
    raise value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 144, in _helper_reraises_exception
    raise ex
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 388, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions
    for action, data in actions:
  File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 714, in _payloads
    filters = self._update(
  File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 435, in _update
    primary_values: list = [
  File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 436, in <listcomp>
    payload_data[key] for key in node.model.primary_keys
TypeError: 'NoneType' object is not subscriptable
Exception in thread Thread-16:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 904, in poll_redis
    self.on_publish(payloads)
  File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 990, in on_publish
    self.sync(self._payloads(_payloads))
  File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 875, in sync
    self.es.bulk(self.index, docs)
  File "/usr/local/lib/python3.8/dist-packages/pgsync/elastichelper.py", line 124, in bulk
    for _ in helpers.parallel_bulk(
  File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 472, in parallel_bulk
    for result in pool.imap(
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 868, in next
    raise value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 144, in _helper_reraises_exception
    raise ex
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 388, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions
    for action, data in actions:
  File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 714, in _payloads
    filters = self._update(
  File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 435, in _update
    primary_values: list = [
  File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 436, in <listcomp>
    payload_data[key] for key in node.model.primary_keys
TypeError: 'NoneType' object is not subscriptable

After that, any data update will make the logs will look like this:

Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [3] => Redis: [1] => Elastic: [0] ...

It increases count in Db but not in Redis and ES

@camirus27 @tthanh I feel this issue is different to the original one. Can you please create a separate issue to with as much details as you can to enable me track it better.