dbt-core: OSError: [Errno 38] Function not implemented

Describe the bug

Using the DBT Python API, I run by using this function: dbt.main.handle_and_check()

I’m also using it with an AWS Lambda that gets deployed (zipped and then gets unzipped by serverless-python-requirements).

I’m setting the argument --profiles-dir to os.path.dirname(__file__) and --project-dir to os.path.dirname(__file__). Both of the profiles.yml and dbt_project.yml are located in the same location under ./data_warehouse/snowflake_dbt

The error I’m getting: OSError: [Errno 38] Function not implemented

Any idea how to solve the issue? when I run the lambda locally, everything works fine. I think it’s due to the combination of AWS Lambda and DBT…

Screenshots and log output

Running with dbt=0.18.1
--- Logging error ---
Traceback (most recent call last):
  File "/var/task/data_warehouse/snowflake_dbt/main.py", line 62, in handler
    results, success = run_dbt(e.name.value, e.macro, e.event_vars, e.event_args)
  File "/var/task/data_warehouse/snowflake_dbt/main.py", line 94, in run_dbt
    results, success = dbt.main.handle_and_check(params)  # pylint: disable=maybe-no-member
  File "/tmp/sls-py-req/dbt/main.py", line 202, in handle_and_check
    task, res = run_from_args(parsed)
  File "/tmp/sls-py-req/dbt/main.py", line 241, in run_from_args
    task = parsed.cls.from_args(args=parsed)
  File "/tmp/sls-py-req/dbt/task/base.py", line 156, in from_args
    return super().from_args(args)
  File "/tmp/sls-py-req/dbt/task/base.py", line 103, in from_args
    return cls(args, config)
  File "/tmp/sls-py-req/dbt/task/runnable.py", line 55, in __init__
    super().__init__(args, config)
  File "/tmp/sls-py-req/dbt/task/base.py", line 151, in __init__
    register_adapter(self.config)
  File "/tmp/sls-py-req/dbt/adapters/factory.py", line 182, in register_adapter
    FACTORY.register_adapter(config)
  File "/tmp/sls-py-req/dbt/adapters/factory.py", line 105, in register_adapter
    adapter: Adapter = adapter_type(config)  # type: ignore
  File "/tmp/sls-py-req/dbt/adapters/base/impl.py", line 162, in __init__
    self.connections = self.ConnectionManager(config)
  File "/tmp/sls-py-req/dbt/adapters/base/connections.py", line 42, in __init__
    self.lock: RLock = flags.MP_CONTEXT.RLock()
  File "/var/lang/lib/python3.7/multiprocessing/context.py", line 72, in RLock
    return RLock(ctx=self.get_context())
  File "/var/lang/lib/python3.7/multiprocessing/synchronize.py", line 187, in __init__
    SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
  File "/var/lang/lib/python3.7/multiprocessing/synchronize.py", line 59, in __init__
    unlink_now)
OSError: [Errno 38] Function not implemented

I also set the threads to 1 in profiles.yml but no help…

System information

Which database are you using dbt with?

  • postgres
  • redshift
  • bigquery
  • snowflake
  • other (specify: ____________)

The output of dbt --version:

installed version: 0.18.1
   latest version: 0.18.1

Up to date!

Plugins:
  - bigquery: 0.18.1
  - snowflake: 0.18.1
  - redshift: 0.18.1
  - postgres: 0.18.1

The operating system you’re using:

The output of python --version: Python 3.7

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 1
  • Comments: 19 (4 by maintainers)

Commits related to this issue

Most upvoted comments

I am also looking for the options to run dbt in AWS Lambda, curious to know if there is any workaround available ?

A possible workaround is to move to a thread execution context which does work inside AWS Lambdas. The following seems to work for my use cases, but very likely needs more testing:

import queue
import threading
from concurrent.futures import ThreadPoolExecutor

import dbt.flags as dbt_flags


# Override multiprocessing ThreadPool with a ThreadPoolExecutor that doesn't use any
# shared memory semaphore locks
class CustomThreadPool:
    def __init__(self, num_threads):
        self.pool = ThreadPoolExecutor(max_workers=num_threads)
    
    # provide the same interface expected by dbt.task.runnable
    def apply_async(self, func, args, callback):
         
        def future_callback(fut): 
            return callback(fut.result())
        
        self.pool.submit(func, *args).add_done_callback(future_callback)
    
    # we would need to actually keep a "closed" attribute lying around and properly check it
    def close(self):
        pass
    
    # shutdown(wait=True) mimics "join", whereas shutdown(wait=False) mimics "terminate"
    def join(self):
        self.pool.shutdown(wait=True)


import multiprocessing.dummy
multiprocessing.dummy.Pool = CustomThreadPool


# Replace Multiprocessing context with threaded context
# The objects mostly have the same api
class ThreadedContext:
    Process = threading.Thread
    Lock = threading.Lock
    RLock = threading.RLock
    Queue = queue.Queue

def get_threaded_context():
    return ThreadedContext()

# override both just in case :)
dbt_flags._get_context = get_threaded_context
dbt_flags.MP_CONTEXT = ThreadedContext()


def handler(event, context):
    # when imported here, we're pretty sure the monkey-patching above already took place
    # it also made testing easier (as AWS Lambda keeps old imports around on warm starts)
    import dbt.main as dbt_main
    dbt_main.log_manager._file_handler.disabled = True
    dbt_args = ["--no-write-json", "--no-use-colors", "run"]
    
    try:
        # main uses sys.exit which doesn't play well with the AWS Lambda handler
        results, succeeded = dbt_main.handle_and_check(dbt_args)
    except Exception as e:
        # This is to ease debugging, the exception catching should be rewritten properly
        traceback.print_exc()
        results = None
        succeeded = False
    else:
        message = "OK"
    # default lambda template
    return {
        'statusCode': 200,
        'body': {
            'message': message,
            'results': str(results),
            'succeeded': succeeded
        }
    }

I attempted to use the new programmatic invocations API available in newly-release dbt v1.5.0: https://docs.getdbt.com/reference/programmatic-invocations

I was hoping perhaps I could get around this “Function not implemented” issue. However, what I found is that not only do I still get the same failure, the workarounds previously discussed in this issue no longer seem to work. I’m not sure if they just need a small tweak for the error, but I’m putting my stack trace below. The workarounds I’m referring to are the ones mentioned by @vchetyrkine earlier in the issue and (this is what I’m actually using) some work by @tomsej found here: https://github.com/tomsej/jaffle_shop_duckdb/commit/a850eaa986167e9bbdc858c870f00ec45b4eca40

Does anyone know how to work around that SemLock issue below with v1.5.0 dbt code?

20:17:53  Traceback (most recent call last):
  File \"/var/lang/lib/python3.9/site-packages/dbt/cli/requires.py\", line 86, in wrapper
    result, success = func(*args, **kwargs)
  File \"/var/lang/lib/python3.9/site-packages/dbt/cli/requires.py\", line 71, in wrapper
    return func(*args, **kwargs)
  File \"/var/lang/lib/python3.9/site-packages/dbt/cli/requires.py\", line 142, in wrapper
    return func(*args, **kwargs)
  File \"/var/lang/lib/python3.9/site-packages/dbt/cli/requires.py\", line 168, in wrapper
    return func(*args, **kwargs)
  File \"/var/lang/lib/python3.9/site-packages/dbt/cli/requires.py\", line 215, in wrapper
    return func(*args, **kwargs)
  File \"/var/lang/lib/python3.9/site-packages/dbt/cli/requires.py\", line 242, in wrapper
    manifest = ManifestLoader.get_full_manifest(
  File \"/var/lang/lib/python3.9/site-packages/dbt/parser/manifest.py\", line 225, in get_full_manifest
    loader = cls(config, projects, macro_hook=macro_hook, file_diff=file_diff)
  File \"/var/lang/lib/python3.9/site-packages/dbt/parser/manifest.py\", line 166, in __init__
    self.manifest: Manifest = Manifest()
  File \"<string>\", line 25, in __init__
  File \"/var/lang/lib/python3.9/multiprocessing/context.py\", line 68, in Lock
    return Lock(ctx=self.get_context())
  File \"/var/lang/lib/python3.9/multiprocessing/synchronize.py\", line 162, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File \"/var/lang/lib/python3.9/multiprocessing/synchronize.py\", line 57, in __init__
    sl = self._semlock = _multiprocessing.SemLock(
OSError: [Errno 38] Function not implemented

Hello! I also have the same issue when trying to run DBT on Databricks cluster. dbt command itself runs smoothly, but dbt debug gives error, mentioned in this thread.

It seems safe to say given this thread that dbt cannot currently be run on AWS Lambda? Is the same true for Azure Functions? It might be useful to document this somewhere, dbt seems perfect for ELT orchestration based on the kind of ephemeral serverless model that Lambda/Functions promote but at least for AWS this appears to be a showstopper unless I’m misreading?

Maybe the method suggested by AWS can be used to overcome this for lambdas? https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/ On the other hand, you can use multiprocessing.Pipe instead of multiprocessing.Queue to accomplish what you need without getting any errors during the execution of the Lambda function.

I think it’s due to the combination of AWS Lambda and DBT

That sounds right to me! Unfortunately, AWS lambda does not support python multiprocessing as intended and as used by dbt. (https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/)

Any way to bypass it? maybe by running dbt in a single processor fashion?

I think it’s due to the combination of AWS Lambda and DBT

That sounds right to me! Unfortunately, AWS lambda does not support python multiprocessing as intended and as used by dbt. (https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/)