Files
paperless-ngx/src/documents/management/commands/document_consumer.py

92 lines
2.7 KiB
Python
Raw Normal View History

import logging
2015-12-20 19:23:33 +00:00
import os
from django.conf import settings
2020-11-01 23:07:54 +01:00
from django.core.management.base import BaseCommand
2020-11-16 18:26:54 +01:00
from django_q.tasks import async_task
2020-11-01 23:07:54 +01:00
from watchdog.events import FileSystemEventHandler
2020-11-12 21:09:45 +01:00
from watchdog.observers import Observer
2020-11-16 18:52:13 +01:00
from watchdog.observers.polling import PollingObserver
2016-01-23 02:33:29 +00:00
2018-05-11 14:01:21 +02:00
try:
from inotify_simple import INotify, flags
except ImportError:
2018-09-02 20:33:49 +01:00
INotify = flags = None
2018-05-11 14:01:21 +02:00
2016-01-23 02:33:29 +00:00
2020-11-01 23:07:54 +01:00
class Handler(FileSystemEventHandler):
2020-11-12 09:30:04 +01:00
def _consume(self, file):
if os.path.isfile(file):
try:
2020-11-21 14:03:45 +01:00
async_task("documents.tasks.consume_file",
file,
task_name=os.path.basename(file)[:100])
2020-11-12 09:30:04 +01:00
except Exception as e:
# Catch all so that the consumer won't crash.
2020-11-21 14:03:45 +01:00
logging.getLogger(__name__).error(
"Error while consuming document: {}".format(e))
2020-11-12 09:30:04 +01:00
2020-11-01 23:07:54 +01:00
def on_created(self, event):
2020-11-12 09:30:04 +01:00
self._consume(event.src_path)
def on_moved(self, event):
self._consume(event.src_path)
2020-11-01 23:07:54 +01:00
class Command(BaseCommand):
2015-12-20 19:23:33 +00:00
"""
On every iteration of an infinite loop, consume what we can from the
consumption directory.
2015-12-20 19:23:33 +00:00
"""
def __init__(self, *args, **kwargs):
2016-01-21 12:50:22 -05:00
2015-12-20 19:23:33 +00:00
self.verbosity = 0
self.logger = logging.getLogger(__name__)
2015-12-20 19:23:33 +00:00
BaseCommand.__init__(self, *args, **kwargs)
def add_arguments(self, parser):
2018-02-25 19:20:51 +01:00
parser.add_argument(
"directory",
default=settings.CONSUMPTION_DIR,
2018-02-26 18:52:46 +01:00
nargs="?",
help="The consumption directory."
2018-02-25 19:20:51 +01:00
)
2015-12-20 19:23:33 +00:00
def handle(self, *args, **options):
self.verbosity = options["verbosity"]
2018-02-25 19:20:51 +01:00
directory = options["directory"]
logging.getLogger(__name__).info(
2020-11-01 23:07:54 +01:00
"Starting document consumer at {}".format(
directory
2018-05-11 14:01:21 +02:00
)
)
2020-11-01 23:07:54 +01:00
# Consume all files as this is not done initially by the watchdog
for entry in os.scandir(directory):
if entry.is_file():
2020-11-21 14:03:45 +01:00
async_task("documents.tasks.consume_file",
entry.path,
task_name=os.path.basename(entry.path)[:100])
2020-11-01 23:07:54 +01:00
# Start the watchdog. Woof!
2020-11-16 18:52:13 +01:00
if settings.CONSUMER_POLLING > 0:
2020-11-21 14:03:45 +01:00
logging.getLogger(__name__).info(
"Using polling instead of file system notifications.")
2020-11-16 18:52:13 +01:00
observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
else:
observer = Observer()
2020-11-16 18:26:54 +01:00
event_handler = Handler()
2020-11-01 23:07:54 +01:00
observer.schedule(event_handler, directory, recursive=True)
observer.start()
try:
while observer.is_alive():
observer.join(1)
except KeyboardInterrupt:
observer.stop()
observer.join()