diff --git a/dspace-api/src/main/java/org/dspace/curate/CurationQueueEntry.java b/dspace-api/src/main/java/org/dspace/curate/CurationQueueEntry.java new file mode 100644 index 000000000000..16cac5b3b785 --- /dev/null +++ b/dspace-api/src/main/java/org/dspace/curate/CurationQueueEntry.java @@ -0,0 +1,173 @@ +/** + * The contents of this file are subject to the license and copyright + * detailed in the LICENSE and NOTICE files at the root of the source + * tree and available online at + * + * http://www.dspace.org/license/ + */ +package org.dspace.curate; + +import java.util.Objects; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Index; +import jakarta.persistence.Table; + +/** + * CurationQueueEntry is the persistent database representation of a queued curation task entry. + * It mirrors the value object {@link TaskQueueEntry} but adds persistence specific fields (id). + * + * @author Stefano Maffei (stefano.maffei at 4science.com) + */ +@Entity +@Table(name = "curation_task_queue", indexes = { + @Index(name = "idx_ctq_queue", columnList = "queue_name") +}) +public class CurationQueueEntry { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Integer id; + + @Column(name = "queue_name", nullable = false) + private String queueName; + + @Column(name = "eperson", nullable = false) + private String epersonId; + + @Column(name = "submit_time", nullable = false) + private long submitTime; + + @Column(name = "tasks", nullable = false) + private String tasks; + + @Column(name = "object_id") + private String objectId; + + /** Default constructor required by JPA. */ + protected CurationQueueEntry() { + // default + } + + /** Convenience constructor. */ + public CurationQueueEntry(String queueName, TaskQueueEntry vo) { + this.queueName = queueName; + this.epersonId = vo.getEpersonId(); + this.submitTime = vo.getSubmitTime(); + this.tasks = String.join(",", vo.getTaskNames()); + this.objectId = vo.getObjectId(); + } + /** + * Gets the unique identifier of this queue entry. + * @return the id + */ + public Integer getId() { + return id; + } + + /** + * Gets the queue name. + * @return the queue name + */ + public String getQueueName() { + return queueName; + } + + /** + * Sets the queue name. + * @param queueName the queue name to set + */ + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + /** + * Gets the eperson identifier. + * @return the eperson id + */ + public String getEpersonId() { + return epersonId; + } + + /** + * Sets the eperson identifier. + * @param epersonId the eperson id to set + */ + public void setEpersonId(String epersonId) { + this.epersonId = epersonId; + } + + /** + * Gets the submit time. + * @return the submit time + */ + public long getSubmitTime() { + return submitTime; + } + + /** + * Sets the submit time. + * @param submitTime the submit time to set + */ + public void setSubmitTime(long submitTime) { + this.submitTime = submitTime; + } + + /** + * Gets the tasks as a comma separated string. + * @return the tasks + */ + public String getTasks() { + return tasks; + } + + /** + * Sets the tasks as a comma separated string. + * @param tasks the tasks to set + */ + public void setTasks(String tasks) { + this.tasks = tasks; + } + + /** + * Gets the object identifier. + * @return the object id + */ + public String getObjectId() { + return objectId; + } + + /** + * Sets the object identifier. + * @param objectId the object id to set + */ + public void setObjectId(String objectId) { + this.objectId = objectId; + } + + /** Rebuild the immutable value object representation. */ + public TaskQueueEntry toTaskQueueEntry() { + return new TaskQueueEntry(epersonId, String.valueOf(submitTime), tasks, objectId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CurationQueueEntry)) { + return false; + } + CurationQueueEntry that = (CurationQueueEntry) o; + return Objects.equals(id, that.id); + } + + @Override + public int hashCode() { + return Objects.hashCode(id); + } +} diff --git a/dspace-api/src/main/java/org/dspace/curate/CurationQueueLock.java b/dspace-api/src/main/java/org/dspace/curate/CurationQueueLock.java new file mode 100644 index 000000000000..e0537ac3b094 --- /dev/null +++ b/dspace-api/src/main/java/org/dspace/curate/CurationQueueLock.java @@ -0,0 +1,128 @@ +/** + * The contents of this file are subject to the license and copyright + * detailed in the LICENSE and NOTICE files at the root of the source + * tree and available online at + * + * http://www.dspace.org/license/ + */ +package org.dspace.curate; + +import java.util.Date; +import java.util.Objects; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Index; +import jakarta.persistence.Table; +import jakarta.persistence.Temporal; +import jakarta.persistence.TemporalType; + +/** + * Represents an exclusive lock on a curation queue. + * It is used to prevent multiple processes from working on the same queue simultaneously. + *

+ * The lock is associated with a queue name and includes a unique token (the ticket), + * when it was acquired and by which process (or worker). + * + * @author Stefano Maffei (stefano.maffei at 4science.com) + */ +@Entity +@Table(name = "curation_queue_lock", indexes = { + @Index(name = "idx_cql_queue_name", columnList = "queue_name", unique = true) +}) +public class CurationQueueLock { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Integer id; + + @Column(name = "queue_name", nullable = false, unique = true, length = 128) + private String queueName; + + @Column(name = "ticket", nullable = false) + private long ticket; + + @Column(name = "owner_id", length = 256) + private String ownerId; + + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "lock_date", nullable = false) + private Date lockDate; + + /** + * Default constructor required by JPA. + */ + protected CurationQueueLock() { + // default + } + + /** + * Main constructor for creating a new lock. + * + * @param queueName Name of the queue to lock + * @param ticket Unique token for this lock + * @param ownerId Identifier of the process/worker owning the lock + */ + public CurationQueueLock(String queueName, long ticket, String ownerId) { + this.queueName = queueName; + this.ticket = ticket; + this.ownerId = ownerId; + this.lockDate = new Date(); + } + + public Integer getId() { + return id; + } + + public String getQueueName() { + return queueName; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + public long getTicket() { + return ticket; + } + + public void setTicket(long ticket) { + this.ticket = ticket; + } + + public String getOwnerId() { + return ownerId; + } + + public void setOwnerId(String ownerId) { + this.ownerId = ownerId; + } + + public Date getLockDate() { + return lockDate; + } + + public void setLockDate(Date lockDate) { + this.lockDate = lockDate; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CurationQueueLock)) { + return false; + } + CurationQueueLock that = (CurationQueueLock) o; + return Objects.equals(id, that.id); + } + + @Override + public int hashCode() { + return Objects.hashCode(id); + } +} diff --git a/dspace-api/src/main/java/org/dspace/curate/DBTaskQueue.java b/dspace-api/src/main/java/org/dspace/curate/DBTaskQueue.java new file mode 100644 index 000000000000..3abc16a1e3eb --- /dev/null +++ b/dspace-api/src/main/java/org/dspace/curate/DBTaskQueue.java @@ -0,0 +1,242 @@ +/** + * The contents of this file are subject to the license and copyright + * detailed in the LICENSE and NOTICE files at the root of the source + * tree and available online at + * + * http://www.dspace.org/license/ + */ +package org.dspace.curate; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.dspace.core.Context; +import org.dspace.curate.dao.CurationQueueEntryDAO; +import org.dspace.curate.dao.CurationQueueLockDAO; +import org.dspace.curate.dao.impl.CurationQueueEntryDAOImpl; +import org.dspace.curate.dao.impl.CurationQueueLockDAOImpl; +import org.dspace.utils.DSpace; +import org.hibernate.exception.ConstraintViolationException; + +/** + * DBTaskQueue provides a TaskQueue implementation backed by a relational database + * (Hibernate/JPA) instead of the original flat-file mechanism. Each enqueued task + * is persisted as a row in the {@code curation_task_queue} table (see entity {@link CurationQueueEntry}). + * + * Concurrency / locking model: + * - Writers simply insert new rows into curation_task_queue table + * - A single reader claims exclusive access to a queue by inserting a row in the curation_queue_lock table + * with a unique ticket value. The uniqueness constraint on the queue_name ensures only one process + * can claim a queue at a time. + * - After claiming, the reader holds an in-memory reference to the ticket used to acquire the lock. + * This ticket must be presented to release the lock. + * - On release: the lock row is deleted, allowing other processes to claim the queue. + * - Only the entries that were retrieved during dequeue will be removed during release, + * not any new entries that were added while processing. + * + * NOTE: This implementation assumes the caller supplies a reasonably unique ticket (e.g. System.currentTimeMillis()). + * + * Format parity: The persistent data mirrors the pipe-separated value used by {@link TaskQueueEntry}: + * epersonId|submitTime|tasks(comma separated)|objectId + * + * This class purposefully manages its own short-lived {@link Context} instances because the TaskQueue + * interface does not expose a Context parameter. + * + * @author Stefano Maffei (stefano.maffei at 4science.com) (DB refactor) + */ +public class DBTaskQueue implements TaskQueue { + private static final Logger log = LogManager.getLogger(TaskQueue.class); + + // ticket of the currently held lock (or -1L if no lock is held) + private long readTicket = -1L; + + // name of the currently locked queue (or null if no lock is held) + private String lockedQueueName = null; + + // IDs of entries that were dequeued and need to be deleted on release + private Set dequeuedEntryIds = new HashSet<>(); + + // Data access objects + private final CurationQueueEntryDAO queueEntryDAO; + private final CurationQueueLockDAO queueLockDAO; + + public DBTaskQueue() { + queueEntryDAO = new DSpace().getSingletonService(CurationQueueEntryDAOImpl.class); + queueLockDAO = new DSpace().getSingletonService(CurationQueueLockDAOImpl.class); + } + + /** + * Return distinct queue names existing in the DB. + */ + @Override + public String[] queueNames() { + try (Context context = new Context()) { + List names = queueEntryDAO.findAllQueueNames(context); + return names.toArray(new String[0]); + } catch (SQLException e) { + log.error("SQL error retrieving queue names: {}", e.getMessage(), e); + return new String[0]; + } catch (Exception e) { + log.error("Error retrieving queue names: {}", e.getMessage(), e); + return new String[0]; + } + } + + /** + * Enqueue a single task entry by delegating to the set-based method. + */ + @Override + public synchronized void enqueue(String queueName, TaskQueueEntry entry) throws IOException { + Set set = new HashSet<>(); + set.add(entry); + enqueue(queueName, set); + } + + /** + * Persist a set of task entries for the specified queue. + * This operation can be performed even when the queue is being processed, + * allowing new tasks to be added while others are in progress. + */ + @Override + public synchronized void enqueue(String queueName, Set entrySet) throws IOException { + if (entrySet == null || entrySet.isEmpty()) { + return; // nothing to do + } + try (Context context = new Context()) { + for (TaskQueueEntry entry : entrySet) { + CurationQueueEntry entity = new CurationQueueEntry(queueName, entry); + queueEntryDAO.create(context, entity); + } + context.commit(); + } catch (SQLException sqle) { + log.error("SQL error while enqueuing tasks for queue '{}': {}", queueName, sqle.getMessage(), sqle); + throw new IOException("Database error enqueuing tasks", sqle); + } catch (ConstraintViolationException e) { + log.error("Violating constraint {}", e.getConstraintName()); + } catch (Exception e) { + log.error("Unexpected error while enqueuing tasks for queue '{}': {}", queueName, e.getMessage(), e); + throw new IOException("Unexpected error enqueuing tasks", e); + } + } + + /** + * Claim (dequeue) all entries for the queue by acquiring an exclusive lock via the curation_queue_lock table. + * Returns the entries as immutable value objects. Subsequent dequeue calls with a different ticket are ignored + * until release is invoked. + * The IDs of the dequeued entries are stored internally so that only these specific entries + * will be removed when release(remove=true) is called. + */ + @Override + public synchronized Set dequeue(String queueName, long ticket) throws IOException { + // Reset the stored entry IDs from any previous dequeue operation + dequeuedEntryIds.clear(); + + if (readTicket != -1L) { + // already holding a lock + return Collections.emptySet(); + } + if (queueName == null) { + return Collections.emptySet(); + } + + try (Context context = new Context()) { + // First check if the queue is already locked by another process + if (queueLockDAO.isQueueLocked(context, queueName)) { + log.debug("Queue '{}' is already locked by another process", queueName); + return Collections.emptySet(); + } + + // Get the number of items in the queue before acquiring the lock + long entryCount = queueEntryDAO.countByQueueName(context, queueName); + if (entryCount == 0) { + // No entries to process + log.debug("Queue '{}' has no entries to process", queueName); + return Collections.emptySet(); + } + + // Create a new lock for this queue + CurationQueueLock lock = new CurationQueueLock(queueName, ticket, + "process-" + Thread.currentThread().getId()); + queueLockDAO.create(context, lock); + + // If we get here, the lock was successfully created + readTicket = ticket; + lockedQueueName = queueName; + + // Fetch all entries for this queue + List entries = queueEntryDAO.findByQueueName(context, queueName); + + // Store the IDs of entries that we're returning for processing + dequeuedEntryIds = entries.stream() + .map(CurationQueueEntry::getId) + .collect(Collectors.toSet()); + + // Convert to task queue entries for return + Set result = new HashSet<>(); + for (CurationQueueEntry entry : entries) { + result.add(entry.toTaskQueueEntry()); + } + + // Commit the transaction to ensure the lock is persisted + context.commit(); + return result; + } catch (SQLException sqle) { + log.error("SQL error while dequeuing tasks for queue '{}': {}", queueName, sqle.getMessage(), sqle); + // If there was an error, ensure we don't hold the lock in memory + readTicket = -1L; + lockedQueueName = null; + dequeuedEntryIds.clear(); + throw new IOException("Database error dequeuing tasks", sqle); + } catch (Exception e) { + log.error("Unexpected error while dequeuing tasks for queue '{}': {}", queueName, e.getMessage(), e); + // If there was an error, ensure we don't hold the lock in memory + readTicket = -1L; + lockedQueueName = null; + dequeuedEntryIds.clear(); + throw new IOException("Unexpected error dequeuing tasks", e); + } + } + + /** + * Release the lock on the queue. If remove=true, only the entries that were retrieved + * during the dequeue operation will be deleted, not any new entries that might have + * been added while processing. + */ + @Override + public synchronized void release(String queueName, long ticket, boolean remove) { + if (ticket != readTicket || readTicket == -1L || lockedQueueName == null || + !lockedQueueName.equals(queueName)) { + return; // nothing to release or ticket/queue mismatch + } + + try (Context context = new Context()) { + // First release the lock + boolean lockReleased = queueLockDAO.releaseLock(context, queueName, ticket); + if (!lockReleased) { + log.warn("Failed to release lock for queue '{}' with ticket {}", queueName, ticket); + } + + // If requested, delete only the entries that were dequeued, not all entries in the queue + if (remove && !dequeuedEntryIds.isEmpty()) { + int deleted = queueEntryDAO.deleteByIds(context, dequeuedEntryIds); + log.debug("Deleted {} entries from queue '{}' that were processed", deleted, queueName); + } + + context.commit(); + } catch (Exception e) { + log.error("Error releasing queue '{}' (remove={}): {}", queueName, remove, e.getMessage(), e); + } finally { + // Even in case of error, release the lock in memory + readTicket = -1L; + lockedQueueName = null; + dequeuedEntryIds.clear(); + } + } +} diff --git a/dspace-api/src/main/java/org/dspace/curate/FileTaskQueue.java b/dspace-api/src/main/java/org/dspace/curate/FileTaskQueue.java index f603fa2e9a6b..ecec8372c3c4 100644 --- a/dspace-api/src/main/java/org/dspace/curate/FileTaskQueue.java +++ b/dspace-api/src/main/java/org/dspace/curate/FileTaskQueue.java @@ -121,7 +121,13 @@ public synchronized Set dequeue(String queueName, long ticket) while ((entryStr = reader.readLine()) != null) { entryStr = entryStr.trim(); if (entryStr.length() > 0) { - entrySet.add(new TaskQueueEntry(entryStr)); + // Assuming entryStr is a delimited string, e.g., "epersonId|submitTime|tasks|objId" + String[] parts = entryStr.split("\\|"); + if (parts.length == 4) { + entrySet.add(new TaskQueueEntry(parts[0], parts[1], parts[2], parts[3])); + } else { + log.error("Malformed queue entry: " + entryStr); + } } } } finally { diff --git a/dspace-api/src/main/java/org/dspace/curate/TaskQueueEntry.java b/dspace-api/src/main/java/org/dspace/curate/TaskQueueEntry.java index 3f802b8df4c0..10b1e885cf5d 100644 --- a/dspace-api/src/main/java/org/dspace/curate/TaskQueueEntry.java +++ b/dspace-api/src/main/java/org/dspace/curate/TaskQueueEntry.java @@ -42,17 +42,20 @@ public TaskQueueEntry(String epersonId, long submitTime, this.objId = objId; } + /** - * Constructor with a pipe-separated list of field values. + * TaskQueueEntry constructor with explicit field values. * - * @param entry list of field values separated by '|'s + * @param epersonId task owner + * @param submitTime time the task was submitted (as String) + * @param tasks comma-separated list of task names + * @param objId usually a handle or workflow id */ - public TaskQueueEntry(String entry) { - String[] tokens = entry.split("\\|"); - epersonId = tokens[0]; - submitTime = tokens[1]; - tasks = tokens[2]; - objId = tokens[3]; + public TaskQueueEntry(String epersonId, String submitTime, String tasks, String objId) { + this.epersonId = epersonId; + this.submitTime = submitTime; + this.tasks = tasks; + this.objId = objId; } /** diff --git a/dspace-api/src/main/java/org/dspace/curate/dao/CurationQueueEntryDAO.java b/dspace-api/src/main/java/org/dspace/curate/dao/CurationQueueEntryDAO.java new file mode 100644 index 000000000000..20b5642b7bfd --- /dev/null +++ b/dspace-api/src/main/java/org/dspace/curate/dao/CurationQueueEntryDAO.java @@ -0,0 +1,77 @@ +/** + * The contents of this file are subject to the license and copyright + * detailed in the LICENSE and NOTICE files at the root of the source + * tree and available online at + * + * http://www.dspace.org/license/ + */ +package org.dspace.curate.dao; + +import java.sql.SQLException; +import java.util.List; +import java.util.Set; + +import org.dspace.core.Context; +import org.dspace.core.GenericDAO; +import org.dspace.curate.CurationQueueEntry; + +/** + * Database Access Object interface for CurationQueueEntry. + * This interface is responsible for all database calls for CurationQueueEntry objects and is + * autowired by Spring. It should only be accessed from a single service and should never be exposed + * outside of the API. + * + * @author Stefano Maffei (stefano.maffei at 4science.com) + */ +public interface CurationQueueEntryDAO extends GenericDAO { + + /** + * Find all distinct queue names in the database. + * + * @param context The DSpace context + * @return List of queue names + * @throws SQLException If database error + */ + List findAllQueueNames(Context context) throws SQLException; + + /** + * Find all entries for a specific queue. + * + * @param context The DSpace context + * @param queueName The name of the queue + * @return List of entries for the queue + * @throws SQLException If database error + */ + List findByQueueName(Context context, String queueName) throws SQLException; + + /** + * Count how many entries are in a queue. + * + * @param context The DSpace context + * @param queueName The name of the queue + * @return Number of entries in the queue + * @throws SQLException If database error + */ + long countByQueueName(Context context, String queueName) throws SQLException; + + /** + * Delete all entries for a specific queue. + * + * @param context The DSpace context + * @param queueName The name of the queue + * @return Number of entries deleted + * @throws SQLException If database error + */ + int deleteByQueueName(Context context, String queueName) throws SQLException; + + /** + * Delete entries by their IDs. + * This allows selective deletion of specific entries from a queue. + * + * @param context The DSpace context + * @param ids Set of entry IDs to delete + * @return Number of entries deleted + * @throws SQLException If database error + */ + int deleteByIds(Context context, Set ids) throws SQLException; +} diff --git a/dspace-api/src/main/java/org/dspace/curate/dao/CurationQueueLockDAO.java b/dspace-api/src/main/java/org/dspace/curate/dao/CurationQueueLockDAO.java new file mode 100644 index 000000000000..544d9726c7f9 --- /dev/null +++ b/dspace-api/src/main/java/org/dspace/curate/dao/CurationQueueLockDAO.java @@ -0,0 +1,65 @@ +/** + * The contents of this file are subject to the license and copyright + * detailed in the LICENSE and NOTICE files at the root of the source + * tree and available online at + * + * http://www.dspace.org/license/ + */ +package org.dspace.curate.dao; + +import java.sql.SQLException; + +import org.dspace.core.Context; +import org.dspace.core.GenericDAO; +import org.dspace.curate.CurationQueueLock; + +/** + * Database Access Object interface for CurationQueueLock. + * Manages access to data for curation queue locks. + * + * @author Stefano Maffei (stefano.maffei at 4science.com) + */ +public interface CurationQueueLockDAO extends GenericDAO { + + /** + * Find an existing lock for the specified queue. + * + * @param context DSpace context + * @param queueName Queue name + * @return The lock if it exists, otherwise null + * @throws SQLException If database error + */ + CurationQueueLock findByQueueName(Context context, String queueName) throws SQLException; + + /** + * Check if a lock exists for the specified queue. + * + * @param context DSpace context + * @param queueName Queue name + * @return true if a lock exists, false otherwise + * @throws SQLException If database error + */ + boolean isQueueLocked(Context context, String queueName) throws SQLException; + + /** + * Verify if the specified lock corresponds to the provided ticket. + * + * @param context DSpace context + * @param queueName Queue name + * @param ticket Ticket to verify + * @return true if the lock is valid for the ticket, false otherwise + * @throws SQLException If database error + */ + boolean validateLock(Context context, String queueName, long ticket) throws SQLException; + + /** + * Release (delete) the lock associated with this queue and ticket. + * + * @param context DSpace context + * @param queueName Queue name + * @param ticket Ticket associated with the lock + * @return true if the lock was successfully released, false otherwise + * @throws SQLException If database error + */ + boolean releaseLock(Context context, String queueName, long ticket) throws SQLException; +} diff --git a/dspace-api/src/main/java/org/dspace/curate/dao/impl/CurationQueueEntryDAOImpl.java b/dspace-api/src/main/java/org/dspace/curate/dao/impl/CurationQueueEntryDAOImpl.java new file mode 100644 index 000000000000..2ad11738ea51 --- /dev/null +++ b/dspace-api/src/main/java/org/dspace/curate/dao/impl/CurationQueueEntryDAOImpl.java @@ -0,0 +1,99 @@ +/** + * The contents of this file are subject to the license and copyright + * detailed in the LICENSE and NOTICE files at the root of the source + * tree and available online at + * + * http://www.dspace.org/license/ + */ +package org.dspace.curate.dao.impl; + +import java.sql.SQLException; +import java.util.List; +import java.util.Set; + +import jakarta.persistence.Query; +import jakarta.persistence.TypedQuery; +import jakarta.persistence.criteria.CriteriaBuilder; +import jakarta.persistence.criteria.CriteriaQuery; +import jakarta.persistence.criteria.Root; +import org.dspace.core.AbstractHibernateDAO; +import org.dspace.core.Context; +import org.dspace.curate.CurationQueueEntry; +import org.dspace.curate.dao.CurationQueueEntryDAO; + +/** + * Hibernate implementation of the Database Access Object for the CurationQueueEntry entity. + * This class is responsible for all database calls for the CurationQueueEntry object and is + * autowired by Spring. It should only be accessed from a single service. + * + * @author Stefano Maffei (stefano.maffei at 4science.com) + */ +public class CurationQueueEntryDAOImpl extends AbstractHibernateDAO + implements CurationQueueEntryDAO { + + protected CurationQueueEntryDAOImpl() { + super(); + } + + @Override + public List findAllQueueNames(Context context) throws SQLException { + CriteriaBuilder criteriaBuilder = getCriteriaBuilder(context); + CriteriaQuery criteriaQuery = criteriaBuilder.createQuery(String.class); + Root root = criteriaQuery.from(CurationQueueEntry.class); + + // Select distinct queue_name values + criteriaQuery.select(root.get("queueName")).distinct(true); + + TypedQuery query = getHibernateSession(context).createQuery(criteriaQuery); + return query.getResultList(); + } + + @Override + public List findByQueueName(Context context, String queueName) throws SQLException { + CriteriaBuilder criteriaBuilder = getCriteriaBuilder(context); + CriteriaQuery criteriaQuery = getCriteriaQuery(criteriaBuilder, CurationQueueEntry.class); + Root root = criteriaQuery.from(CurationQueueEntry.class); + + criteriaQuery.where(criteriaBuilder.equal(root.get("queueName"), queueName)); + TypedQuery query = getHibernateSession(context).createQuery(criteriaQuery); + return query.getResultList(); + } + + @Override + public long countByQueueName(Context context, String queueName) throws SQLException { + CriteriaBuilder criteriaBuilder = getCriteriaBuilder(context); + CriteriaQuery criteriaQuery = criteriaBuilder.createQuery(Long.class); + Root root = criteriaQuery.from(CurationQueueEntry.class); + + criteriaQuery.select(criteriaBuilder.count(root)); + criteriaQuery.where(criteriaBuilder.equal(root.get("queueName"), queueName)); + + return getHibernateSession(context).createQuery(criteriaQuery).getSingleResult(); + } + + @Override + public int deleteByQueueName(Context context, String queueName) throws SQLException { + // Use a direct query for bulk deletion + Query query = createQuery(context, + "DELETE FROM CurationQueueEntry e WHERE e.queueName = :queueName"); + query.setParameter("queueName", queueName); + + // Returns the number of rows deleted + return query.executeUpdate(); + } + + @Override + public int deleteByIds(Context context, Set ids) throws SQLException { + if (ids == null || ids.isEmpty()) { + return 0; + } + + // Use a direct query for bulk deletion by IDs + Query query = createQuery(context, + "DELETE FROM CurationQueueEntry e WHERE e.id IN :ids"); + query.setParameter("ids", ids); + + // Returns the number of rows deleted + return query.executeUpdate(); + } +} diff --git a/dspace-api/src/main/java/org/dspace/curate/dao/impl/CurationQueueLockDAOImpl.java b/dspace-api/src/main/java/org/dspace/curate/dao/impl/CurationQueueLockDAOImpl.java new file mode 100644 index 000000000000..22fcd72e3fc5 --- /dev/null +++ b/dspace-api/src/main/java/org/dspace/curate/dao/impl/CurationQueueLockDAOImpl.java @@ -0,0 +1,64 @@ +/** + * The contents of this file are subject to the license and copyright + * detailed in the LICENSE and NOTICE files at the root of the source + * tree and available online at + * + * http://www.dspace.org/license/ + */ +package org.dspace.curate.dao.impl; + +import java.sql.SQLException; +import java.util.List; + +import jakarta.persistence.criteria.CriteriaBuilder; +import jakarta.persistence.criteria.CriteriaQuery; +import jakarta.persistence.criteria.Root; +import org.dspace.core.AbstractHibernateDAO; +import org.dspace.core.Context; +import org.dspace.curate.CurationQueueLock; +import org.dspace.curate.dao.CurationQueueLockDAO; + +/** + * Hibernate implementation of the Database Access Object for the CurationQueueLock entity. + * + * @author Stefano Maffei (stefano.maffei at 4science.com) + */ +public class CurationQueueLockDAOImpl extends AbstractHibernateDAO implements CurationQueueLockDAO { + + protected CurationQueueLockDAOImpl() { + super(); + } + + @Override + public CurationQueueLock findByQueueName(Context context, String queueName) throws SQLException { + CriteriaBuilder criteriaBuilder = getCriteriaBuilder(context); + CriteriaQuery criteriaQuery = getCriteriaQuery(criteriaBuilder, CurationQueueLock.class); + Root root = criteriaQuery.from(CurationQueueLock.class); + + criteriaQuery.where(criteriaBuilder.equal(root.get("queueName"), queueName)); + + List result = list(context, criteriaQuery, false, CurationQueueLock.class, -1, -1); + return result.isEmpty() ? null : result.get(0); + } + + @Override + public boolean isQueueLocked(Context context, String queueName) throws SQLException { + return findByQueueName(context, queueName) != null; + } + + @Override + public boolean validateLock(Context context, String queueName, long ticket) throws SQLException { + CurationQueueLock lock = findByQueueName(context, queueName); + return lock != null && lock.getTicket() == ticket; + } + + @Override + public boolean releaseLock(Context context, String queueName, long ticket) throws SQLException { + CurationQueueLock lock = findByQueueName(context, queueName); + if (lock != null && lock.getTicket() == ticket) { + delete(context, lock); + return true; + } + return false; + } +} diff --git a/dspace-api/src/main/resources/org/dspace/storage/rdbms/sqlmigration/h2/V8.0_2025.09.29__curation_task_queue.sql b/dspace-api/src/main/resources/org/dspace/storage/rdbms/sqlmigration/h2/V8.0_2025.09.29__curation_task_queue.sql new file mode 100644 index 000000000000..ce548b4262f2 --- /dev/null +++ b/dspace-api/src/main/resources/org/dspace/storage/rdbms/sqlmigration/h2/V8.0_2025.09.29__curation_task_queue.sql @@ -0,0 +1,37 @@ +-- +-- The contents of this file are subject to the license and copyright +-- detailed in the LICENSE and NOTICE files at the root of the source +-- tree and available online at +-- +-- http://www.dspace.org/license/ +-- + +-- ======================================================================= +-- DSpace Database Migration (H2) +-- Version: 8.0 +-- Date: 2025-09-29 +-- Description: Create table to persist curation task queue entries migrated +-- from previous file-based implementation (DBTaskQueue). +-- Each row represents a TaskQueueEntry. +-- ======================================================================= + +CREATE TABLE IF NOT EXISTS curation_task_queue ( + id INTEGER AUTO_INCREMENT PRIMARY KEY, + queue_name VARCHAR(128) NOT NULL, + eperson VARCHAR(256), + submit_time BIGINT NOT NULL, + tasks VARCHAR(1024) NOT NULL, + object_id VARCHAR(512), + UNIQUE (queue_name, tasks, object_id) +); +-- New table to manage locks on curation queues +CREATE TABLE IF NOT EXISTS curation_queue_lock ( + id INTEGER AUTO_INCREMENT PRIMARY KEY, + queue_name VARCHAR(128) NOT NULL, + ticket BIGINT NOT NULL, + owner_id VARCHAR(256), + lock_date TIMESTAMP NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_cql_queue_name ON curation_queue_lock(queue_name); +CREATE INDEX IF NOT EXISTS idx_ctq_queue ON curation_task_queue(queue_name); diff --git a/dspace-api/src/main/resources/org/dspace/storage/rdbms/sqlmigration/postgres/V8.0_2025.09.29__curation_task_queue.sql b/dspace-api/src/main/resources/org/dspace/storage/rdbms/sqlmigration/postgres/V8.0_2025.09.29__curation_task_queue.sql new file mode 100644 index 000000000000..005ea61b89c9 --- /dev/null +++ b/dspace-api/src/main/resources/org/dspace/storage/rdbms/sqlmigration/postgres/V8.0_2025.09.29__curation_task_queue.sql @@ -0,0 +1,38 @@ +-- +-- The contents of this file are subject to the license and copyright +-- detailed in the LICENSE and NOTICE files at the root of the source +-- tree and available online at +-- +-- http://www.dspace.org/license/ +-- + +-- ======================================================================= +-- DSpace Database Migration +-- Version: 8.0 +-- Date: 2025-09-29 +-- Description: Create table to persist curation task queue entries migrated +-- from previous file-based implementation (DBTaskQueue). +-- Each row represents a TaskQueueEntry. +-- +-- ======================================================================= + +CREATE TABLE IF NOT EXISTS curation_task_queue ( + id SERIAL PRIMARY KEY, + queue_name TEXT NOT NULL, + eperson TEXT, + submit_time BIGINT NOT NULL, + tasks TEXT NOT NULL, + object_id TEXT, + UNIQUE (queue_name, tasks, object_id) +); +-- New table to manage locks on curation queues +CREATE TABLE IF NOT EXISTS curation_queue_lock ( + id SERIAL PRIMARY KEY, + queue_name TEXT NOT NULL, + ticket BIGINT NOT NULL, + owner_id TEXT, + lock_date TIMESTAMP NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_cql_queue_name ON curation_queue_lock(queue_name); +CREATE INDEX IF NOT EXISTS idx_ctq_queue ON curation_task_queue(queue_name); diff --git a/dspace-api/src/test/data/dspaceFolder/config/modules/curate.cfg b/dspace-api/src/test/data/dspaceFolder/config/modules/curate.cfg index 92ba93ca3813..0e5ec49f733d 100644 --- a/dspace-api/src/test/data/dspaceFolder/config/modules/curate.cfg +++ b/dspace-api/src/test/data/dspaceFolder/config/modules/curate.cfg @@ -20,7 +20,7 @@ plugin.named.org.dspace.curate.CurationTask = org.dspace.ctask.general.MetadataV plugin.named.org.dspace.curate.CurationTask = org.dspace.ctask.testing.MarkerTask = marker ## task queue implementation -plugin.single.org.dspace.curate.TaskQueue = org.dspace.curate.FileTaskQueue +plugin.single.org.dspace.curate.TaskQueue = org.dspace.curate.DBTaskQueue # directory location of curation task queues curate.taskqueue.dir = ${dspace.dir}/ctqueues diff --git a/dspace-server-webapp/src/test/java/org/dspace/curate/CurationScriptIT.java b/dspace-server-webapp/src/test/java/org/dspace/curate/CurationScriptIT.java index 915b3b9a6529..15b8851ded73 100644 --- a/dspace-server-webapp/src/test/java/org/dspace/curate/CurationScriptIT.java +++ b/dspace-server-webapp/src/test/java/org/dspace/curate/CurationScriptIT.java @@ -23,6 +23,7 @@ import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.Strings; import org.dspace.app.rest.converter.DSpaceRunnableParameterConverter; import org.dspace.app.rest.matcher.ProcessMatcher; @@ -710,4 +711,187 @@ boolean checkIfInfoTextLoggedByHandler(TestDSpaceRunnableHandler handler, String return false; } + /** + * Test DBTaskQueue basic functionality including enqueue, dequeue and release operations. + * + * @author Stefano Maffei (stefano.maffei at 4science.com) + * @throws Exception if test fails + */ + @Test + public void testDBTaskQueue() throws Exception { + context.turnOffAuthorisationSystem(); + + // Create a test item for curation + parentCommunity = CommunityBuilder.createCommunity(context) + .withName("Test Community") + .build(); + Collection testCollection = CollectionBuilder.createCollection(context, parentCommunity) + .withName("Test Collection") + .build(); + Item testItem = ItemBuilder.createItem(context, testCollection) + .withTitle("Test Item for DBTaskQueue") + .withIssueDate("2026-02-10") + .withAuthor("Test, Author") + .build(); + + // Create DBTaskQueue instance + DBTaskQueue dbTaskQueue = new DBTaskQueue(); + + String queueName = "testQueue"; + long ticket = System.currentTimeMillis(); + + // Create test TaskQueueEntry using the correct constructor + TaskQueueEntry testEntry = new TaskQueueEntry( + admin.getID().toString(), + System.currentTimeMillis(), + Arrays.asList("noop"), + testItem.getHandle() + ); + + context.restoreAuthSystemState(); + + try { + // Test 1: Check initial queue is empty + String[] initialQueues = dbTaskQueue.queueNames(); + boolean queueExists = false; + for (String queue : initialQueues) { + if (queueName.equals(queue)) { + queueExists = true; + break; + } + } + assertFalse("Queue should not exist initially", queueExists); + + // Test 2: Enqueue a task + dbTaskQueue.enqueue(queueName, testEntry); + + // Verify queue exists after enqueue + String[] queuesAfterEnqueue = dbTaskQueue.queueNames(); + boolean foundQueue = false; + for (String queue : queuesAfterEnqueue) { + if (queueName.equals(queue)) { + foundQueue = true; + break; + } + } + assertTrue("Queue should exist after enqueue", foundQueue); + + // Test 3: Dequeue tasks + java.util.Set dequeuedTasks = dbTaskQueue.dequeue(queueName, ticket); + assertFalse("Dequeued tasks should not be empty", dequeuedTasks.isEmpty()); + org.junit.Assert.assertEquals("Should have exactly one task", 1, dequeuedTasks.size()); + + // Verify the dequeued task matches what we enqueued + TaskQueueEntry dequeuedTask = dequeuedTasks.iterator().next(); + org.junit.Assert.assertEquals("EPersonID should match", testEntry.getEpersonId(), + dequeuedTask.getEpersonId()); + org.junit.Assert.assertEquals("ObjectID should match", testEntry.getObjectId(), + dequeuedTask.getObjectId()); + + // Test 4: Release with remove=true + dbTaskQueue.release(queueName, ticket, true); + + // Test 5: Try to dequeue after release (should be empty as tasks were removed) + long differentTicket = ticket + 1000; + java.util.Set afterRelease = dbTaskQueue.dequeue(queueName, differentTicket); + assertTrue("Queue should be empty after release with remove=true", afterRelease.isEmpty()); + + // Clean up - release any remaining lock + dbTaskQueue.release(queueName, differentTicket, false); + + } catch (Exception e) { + // Clean up in case of error + try { + dbTaskQueue.release(queueName, ticket, true); + } catch (Exception ignored) { + // Ignore cleanup errors + } + throw e; + } + } + + /** + * Test curate script execution with -q option to process curation queues. + * This test verifies the integration between DBTaskQueue and the curation script. + * + * @author Stefano Maffei (stefano.maffei at 4science.com) + * @throws Exception if test fails + */ + @Test + public void testCurateScriptWithQueueOption() throws Exception { + context.turnOffAuthorisationSystem(); + + // Create a test item for curation + parentCommunity = CommunityBuilder.createCommunity(context) + .withName("Test Community for Queue") + .build(); + Collection testCollection = CollectionBuilder.createCollection(context, parentCommunity) + .withName("Test Collection for Queue") + .build(); + Item testItem = ItemBuilder.createItem(context, testCollection) + .withTitle("Test Item for Queue Processing") + .withIssueDate("2026-02-10") + .withAuthor("Queue, Test") + .build(); + + String queueName = "testQueueScript"; + + context.restoreAuthSystemState(); + + try { + // Step 1: Populate the queue using DBTaskQueue + DBTaskQueue dbTaskQueue = new DBTaskQueue(); + TaskQueueEntry testEntry = new TaskQueueEntry( + admin.getID().toString(), + System.currentTimeMillis(), + Arrays.asList("noop"), + testItem.getHandle() + ); + + dbTaskQueue.enqueue(queueName, testEntry); + + // Verify the queue has been populated + String[] queueNames = dbTaskQueue.queueNames(); + boolean queueFound = false; + for (String queue : queueNames) { + if (queueName.equals(queue)) { + queueFound = true; + break; + } + } + assertTrue("Queue should exist before processing", queueFound); + + // Step 2: Execute curate script with -q option using TestDSpaceRunnableHandler + String[] args = new String[] {"curate", "-q", queueName}; + TestDSpaceRunnableHandler testDSpaceRunnableHandler = new TestDSpaceRunnableHandler(); + + ScriptService scriptService = ScriptServiceFactory.getInstance().getScriptService(); + ScriptConfiguration scriptConfiguration = scriptService.getScriptConfiguration(args[0]); + + DSpaceRunnable script = null; + if (scriptConfiguration != null) { + script = scriptService.createDSpaceRunnableForScriptConfiguration(scriptConfiguration); + } + if (script != null) { + script.initialize(args, testDSpaceRunnableHandler, admin); + script.run(); + } + + // Step 3: Verify the queue has been processed (should be empty after processing) + // Note: The queue should be empty after successful processing as tasks are consumed + String[] queuesAfterProcessing = dbTaskQueue.queueNames(); + assertTrue("Queue should be empty after processing", ArrayUtils.isEmpty(queuesAfterProcessing)); + + } catch (Exception e) { + // Clean up in case of error + try { + DBTaskQueue dbTaskQueue = new DBTaskQueue(); + dbTaskQueue.release(queueName, System.currentTimeMillis(), true); + } catch (Exception ignored) { + // Ignore cleanup errors + } + throw e; + } + } + } diff --git a/dspace/config/hibernate.cfg.xml b/dspace/config/hibernate.cfg.xml index da84fc788676..8382ee51df1c 100644 --- a/dspace/config/hibernate.cfg.xml +++ b/dspace/config/hibernate.cfg.xml @@ -110,5 +110,8 @@ + + + diff --git a/dspace/config/modules/curate.cfg b/dspace/config/modules/curate.cfg index 6e75738de543..ea6148a49095 100644 --- a/dspace/config/modules/curate.cfg +++ b/dspace/config/modules/curate.cfg @@ -19,7 +19,7 @@ plugin.named.org.dspace.curate.CurationTask = org.dspace.ctask.general.RegisterD # add new tasks here (or in additional config files) ## task queue implementation -plugin.single.org.dspace.curate.TaskQueue = org.dspace.curate.FileTaskQueue +plugin.single.org.dspace.curate.TaskQueue = org.dspace.curate.DBTaskQueue # directory location of curation task queues curate.taskqueue.dir = ${dspace.dir}/ctqueues diff --git a/dspace/config/spring/api/core-dao-services.xml b/dspace/config/spring/api/core-dao-services.xml index 9089712a6249..c15734844235 100644 --- a/dspace/config/spring/api/core-dao-services.xml +++ b/dspace/config/spring/api/core-dao-services.xml @@ -51,6 +51,9 @@ + + +