Skip to content

Commit

Permalink
Fix MyPy errors in leveldb (#20222)
Browse files Browse the repository at this point in the history
Part of #19891
  • Loading branch information
potiuk committed Dec 15, 2021
1 parent de36616 commit c4b3694
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 15 deletions.
30 changes: 23 additions & 7 deletions airflow/providers/google/leveldb/hooks/leveldb.py
Expand Up @@ -23,6 +23,8 @@
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook

DB_NOT_INITIALIZED_BEFORE = "The `get_conn` method should be called before!"


class LevelDBHookException(AirflowException):
"""Exception specific for LevelDB"""
Expand All @@ -43,7 +45,7 @@ def __init__(self, leveldb_conn_id: str = default_conn_name):
super().__init__()
self.leveldb_conn_id = leveldb_conn_id
self.connection = self.get_connection(leveldb_conn_id)
self.db = None
self.db: Optional[plyvel.DB] = None

def get_conn(self, name: str = '/tmp/testdb/', create_if_missing: bool = False, **kwargs) -> DB:
"""
Expand Down Expand Up @@ -74,9 +76,9 @@ def run(
self,
command: str,
key: bytes,
value: bytes = None,
keys: List[bytes] = None,
values: List[bytes] = None,
value: Optional[bytes] = None,
keys: Optional[List[bytes]] = None,
values: Optional[List[bytes]] = None,
) -> Optional[bytes]:
"""
Execute operation with leveldb
Expand All @@ -87,21 +89,27 @@ def run(
:param key: key for command(put,get,delete) execution(, e.g. ``b'key'``, ``b'another-key'``)
:type key: bytes
:param value: value for command(put) execution(bytes, e.g. ``b'value'``, ``b'another-value'``)
:type value: bytes
:type value: Optional[bytes]
:param keys: keys for command(write_batch) execution(List[bytes], e.g. ``[b'key', b'another-key'])``
:type keys: List[bytes]
:type keys: Optional[List[bytes]]
:param values: values for command(write_batch) execution e.g. ``[b'value'``, ``b'another-value']``
:type values: List[bytes]
:type values: Optional[List[bytes]]
:returns: value from get or None
:rtype: Optional[bytes]
"""
if command == 'put':
if not value:
raise Exception("Please provide `value`!")
return self.put(key, value)
elif command == 'get':
return self.get(key)
elif command == 'delete':
return self.delete(key)
elif command == 'write_batch':
if not keys:
raise Exception("Please provide `keys`!")
if not values:
raise Exception("Please provide `values`!")
return self.write_batch(keys, values)
else:
raise LevelDBHookException("Unknown command for LevelDB hook")
Expand All @@ -115,6 +123,8 @@ def put(self, key: bytes, value: bytes):
:param value: value for put execution e.g. ``b'value'``, ``b'another-value'``
:type value: bytes
"""
if not self.db:
raise Exception(DB_NOT_INITIALIZED_BEFORE)
self.db.put(key, value)

def get(self, key: bytes) -> bytes:
Expand All @@ -126,6 +136,8 @@ def get(self, key: bytes) -> bytes:
:returns: value of key from db.get
:rtype: bytes
"""
if not self.db:
raise Exception(DB_NOT_INITIALIZED_BEFORE)
return self.db.get(key)

def delete(self, key: bytes):
Expand All @@ -135,6 +147,8 @@ def delete(self, key: bytes):
:param key: key for delete execution, e.g. ``b'key'``, ``b'another-key'``
:type key: bytes
"""
if not self.db:
raise Exception(DB_NOT_INITIALIZED_BEFORE)
self.db.delete(key)

def write_batch(self, keys: List[bytes], values: List[bytes]):
Expand All @@ -146,6 +160,8 @@ def write_batch(self, keys: List[bytes], values: List[bytes]):
:param values: values for write_batch execution e.g. ``[b'value', b'another-value']``
:type values: List[bytes]
"""
if not self.db:
raise Exception(DB_NOT_INITIALIZED_BEFORE)
with self.db.write_batch() as batch:
for i, key in enumerate(keys):
batch.put(key, values[i])
16 changes: 8 additions & 8 deletions airflow/providers/google/leveldb/operators/leveldb.py
Expand Up @@ -34,11 +34,11 @@ class LevelDBOperator(BaseOperator):
:param key: key for command(put,get,delete) execution(, e.g. ``b'key'``, ``b'another-key'``)
:type key: bytes
:param value: value for command(put) execution(bytes, e.g. ``b'value'``, ``b'another-value'``)
:type value: bytes
:type value: Optional[bytes]
:param keys: keys for command(write_batch) execution(List[bytes], e.g. ``[b'key', b'another-key'])``
:type keys: List[bytes]
:type keys: Optional[List[bytes]]
:param values: values for command(write_batch) execution e.g. ``[b'value'``, ``b'another-value']``
:type values: List[bytes]
:type values: Optional[List[bytes]]
:param leveldb_conn_id:
:type leveldb_conn_id: str
:param create_if_missing: whether a new database should be created if needed
Expand All @@ -53,9 +53,9 @@ def __init__(
*,
command: str,
key: bytes,
value: bytes = None,
keys: List[bytes] = None,
values: List[bytes] = None,
value: Optional[bytes] = None,
keys: Optional[List[bytes]] = None,
values: Optional[List[bytes]] = None,
leveldb_conn_id: str = 'leveldb_default',
name: str = '/tmp/testdb/',
create_if_missing: bool = True,
Expand Down Expand Up @@ -94,5 +94,5 @@ def execute(self, context) -> Optional[str]:
)
self.log.info("Done. Returned value was: %s", str(value))
leveldb_hook.close_conn()
value = value if value is None else value.decode()
return value
str_value = value if value is None else value.decode()
return str_value

0 comments on commit c4b3694

Please sign in to comment.