Saturday, September 4, 2010

Transactions for File Transfer

Transactions

Database transactions are a convenient way to maintain consistent state during data processing functions. If an error occurs during processing, just rollback the transaction to avoid incomplete or incorrect data being stored.

Problem

I've worked on many problems where data processing involves retrieving a source file, performing some type of processing, and then writing to a destination file. These functions are tricky, because if a problem arises during the processing, you're left with an inconsistent, partially processed batch of files. This problem is especially pronounced if you're storing file metadata in a database. If you perform a rollback of your database transaction when an error occurs, then you've lost any updated metadata about the files that were processed correctly.

Solution

In an attempt to remedy this problem I've developed a somewhat naive implementation of a file transaction class that can be used to maintain consistent state during processing function involving many files. The transaction object keeps track of all files that have been created and all files that should be deleted. All files marked for deletion are deleted when a commit occurs. All files marked as created are removed when a rollback occurs. If a file needs to be moved, it is instead copied, and the source file is marked for deletion, and the destination file is marked as being created.

Implementation


import glob
import os
import shutil

class Transaction(object):
"""
Manages transactions for file storage.

Assumes each file is only being operated on by one person at a time.

If multiple users try to operate on the same file, then the last
to access gets an exception.
"""

lock_postfix = 't_lock'

def __init__(self):
self._level = 0

def _get_lock_path(self, path):
"""Return lock file path."""

if path.endswith('/'):
end = len(path) - 1
path = path[:end]

return path + '.%s' % self.lock_postfix

def _set_files(self):
"""Resets file lists."""

self._files_added = set()
self._files_removed = set()
self._dirs_added = set()
self._dirs_removed = set()
self._locked_files = set()

# Unlike the other types,
# move operations
# must be ordered!!
self._files_moved = []

def _check_level(self):
"""Raises exception if level is not 1 or above."""

if self._level < 1:
raise exceptions.TransactionError('Transaction not active.')

def _rm(self, file_paths, dir_paths):
"""Remove all files."""

for dir_path in dir_paths:
if os.path.exists(dir_path):
shutil.rmtree(dir_path)

for file_path in file_paths:
if os.path.exists(file_path):
os.unlink(file_path)

def _rev_moves(self):
"""Reverse moved files."""

for move in reversed(self._files_moved):
shutil.move(move[1], move[0])

def _acquire_lock(self, path):
"""Attempt to lock a file."""

# Make sure transaction is started
self._check_level()

if path not in self._locked_files:
# Create lock file on file system
lock_path = self._get_lock_path(path)
if os.path.exists(lock_path):
# Multi-user access is not allowed!
raise exceptions.TransactionError('File is locked.')
out_file = open(lock_path, 'w')
out_file.write('\n')
out_file.close()
self._locked_files.add(path)

def _release_lock(self, path):
"""Release a lock file."""

lock_path = self._get_lock_path(path)
if os.path.exists(lock_path):
os.unlink(lock_path)
self._locked_files.discard(path)

def _release_locks(self):
"""Release all locks."""

locked_paths = self._locked_files.copy()
for path in locked_paths:
self._release_lock(path)

def copy_file(self, src_path, dest_path, remove_existing=False, directory=False):
"""Copy a file. Set remove_existing to True to move file."""

if directory is True:
shutil.copytree(src_path, dest_path, symlinks=True)
else:
shutil.copyfile(src_path, dest_path)
self.add_file(dest_path, directory=directory)

if remove_existing is True:
self.remove_file(src_path, directory=directory)

def add_file(self, path, directory=None):
"""Add a file to the transaction."""

self._check_level()

self._acquire_lock(path)

if directory is None:
directory = os.path.isdir(path)

if directory is True:
self._dirs_added.add(path)
else:
self._files_added.add(path)

def remove_file(self, path, directory=None):
"""Remove a file from the transaction."""

self._check_level()

self._acquire_lock(path)

if directory is None:
directory = os.path.isdir(path)

if directory is True:
self._dirs_removed.add(path)
else:
self._files_removed.add(path)

def move_file(self, src_path, dest_path):
"""Move a file from one location to another."""

self._check_level()

self._acquire_lock(src_path)
self._acquire_lock(dest_path)

shutil.move(src_path, dest_path)
self._files_moved.append((src_path, dest_path))

def begin(self):
"""Begin transaction."""

if self._level == 0:
self._set_files()

self._level += 1

def commit(self):
"""Removes all 'removed' files and dirs."""

self._check_level()

self._level -= 1
if self._level == 0:
self._rm(self._files_removed, self._dirs_removed)
self._release_locks()

def rollback(self):
"""Removes all 'added' files and dirs."""

self._check_level()

self._level -= 1
if self._level == 0:
self._rm(self._files_added, self._dirs_added)
self._rev_moves()
self._release_locks()


Example


def process():
transaction = Transaction()
transaction.begin()
try:
# Mark a file as created
transaction.add_file(new_file)

# Mark a file as deleted
transaction.remove_file(delete_file)

# Copy a file
transaction.copy_file(src_file, dest_file)

# Move a file
transaction.move_file(mov_src_file, mov_dest_file)
transaction.commit()
except:
transaction.rollback()
raise


Limitations

The class only works for single user environments. A lock file is created for every file added to a transaction. If a different transaction tries to acquire a lock for a file that is already locked, an exception is raised. Negotiating multi-user access would be quite tricky, especially in the case of delete files, where the file no longer exists after the lock is released.

No comments:

Post a Comment