Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.sling.distribution.queue.impl.ErrorQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.MultipleQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.PriorityQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.SingleQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
import org.apache.sling.distribution.queue.impl.resource.ResourceQueueProvider;
import org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueueProvider;
Expand Down Expand Up @@ -301,7 +302,13 @@ protected SimpleDistributionAgent createAgent(String agentName, BundleContext co
processingQueues.addAll(deliveryQueues.values());
exportQueueStrategy = new AsyncDeliveryDispatchingStrategy(deliveryQueues);
} else {
exportQueueStrategy = new MultipleQueueDispatchingStrategy(endpointNames.toArray(new String[endpointNames.size()]));
if (endpointNames.size() == 1) {
exportQueueStrategy = new SingleQueueDispatchingStrategy(endpointNames
.toArray(new String[0])[0]);
} else {
exportQueueStrategy = new MultipleQueueDispatchingStrategy(endpointNames
.toArray(new String[endpointNames.size()]));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import javax.jcr.Binary;
import javax.jcr.Node;
import javax.jcr.PathNotFoundException;
import javax.jcr.RepositoryException;
import javax.jcr.nodetype.NodeType;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -341,10 +342,10 @@ public static void release(Resource resource, @NotNull String[] holderNames) thr
Node refs = parent.getNode("refs");

for (String holderName : holderNames) {
Node refNode = refs.getNode(holderName);
if (refNode != null) {
try {
Node refNode = refs.getNode(holderName);
refNode.remove();
}
} catch (PathNotFoundException ign) { }
}

if (!refs.hasProperty("released")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,9 @@ public Iterable<DistributionQueueItemStatus> add(@NotNull DistributionPackage di
// second add the package to all queues
for (String queueName : queueNames) {
DistributionQueue queue = queueProvider.getQueue(queueName);
DistributionQueueItemStatus status = new DistributionQueueItemStatus(DistributionQueueItemState.ERROR, queue.getName());

DistributionQueueEntry queueEntry = queue.add(queueItem);

if (queueEntry != null) {
status = queueEntry.getStatus();
} else {
DistributionQueueItemStatus status = addItemToQueue(queueItem, queue);
if (null == status) {
status = new DistributionQueueItemStatus(DistributionQueueItemState.ERROR, queueName);
DistributionPackageUtils.release(distributionPackage, queueName);
log.error("cannot add package {} to queue {}", distributionPackage.getId(), queueName);
}
Expand All @@ -86,8 +82,21 @@ public List<String> getQueueNames() {
return Arrays.asList(queueNames);
}

private DistributionQueueItem getItem(DistributionPackage distributionPackage) {
protected DistributionQueueItem getItem(DistributionPackage distributionPackage) {
return DistributionPackageUtils.toQueueItem(distributionPackage);
}

}
protected DistributionQueueItemStatus addItemToQueue(@NotNull DistributionQueueItem queueItem,
@NotNull DistributionQueue queue) throws DistributionException {

DistributionQueueItemStatus status = null;

DistributionQueueEntry queueEntry = queue.add(queueItem);

if (queueEntry != null) {
status = queueEntry.getStatus();
}

return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,54 @@
*/
package org.apache.sling.distribution.queue.impl;

import java.util.Collections;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.impl.SharedDistributionPackage;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The default strategy for delivering packages to queues. Each agent just manages a single queue,
* no failure / stuck handling where each package is put regardless of anything.
*/
public class SingleQueueDispatchingStrategy extends MultipleQueueDispatchingStrategy {

private final Logger log = LoggerFactory.getLogger(getClass());
private static final String DEFAULT_QUEUE_NAME = DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME;

private final String queueName;

public SingleQueueDispatchingStrategy(String queueName) {
super(new String[]{queueName});
this.queueName = queueName;
}

public SingleQueueDispatchingStrategy() {
super(new String[]{DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME});
this(DEFAULT_QUEUE_NAME);
}

@Override
public Iterable<DistributionQueueItemStatus> add(@NotNull DistributionPackage distributionPackage,
@NotNull DistributionQueueProvider queueProvider) throws DistributionException {
if (!(distributionPackage instanceof SharedDistributionPackage)) {
throw new DistributionException("distribution package must be a shared package to be added in multiple queues");
}

DistributionQueueItem queueItem = getItem(distributionPackage);

DistributionQueue queue = queueProvider.getQueue(queueName);
DistributionQueueItemStatus status = addItemToQueue(queueItem, queue);
if (null == status) {
status = new DistributionQueueItemStatus(DistributionQueueItemState.ERROR, queueName);
log.error("cannot add package {} to queue {}", distributionPackage.getId(), queueName);
}

return Collections.singletonList(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public static Resource createResource(Resource root, DistributionQueueItem queue
properties.put("sling:resourceType", RESOURCE_ITEM);
properties.put(ENTERED_DATE, Calendar.getInstance());
Resource resourceItem = ResourceUtil.getOrCreateResource(resourceResolver, entryPath, properties,
RESOURCE_FOLDER, true);
RESOURCE_FOLDER, false);

resourceResolver.commit();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Iterator;

import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.impl.SharedDistributionPackage;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
Expand All @@ -45,7 +46,7 @@ public class SingleQueueDistributionStrategyTest {
@Test
public void testPackageAdditionWithSucceedingItemDelivery() throws Exception {
SingleQueueDispatchingStrategy singleQueueDistributionStrategy = new SingleQueueDispatchingStrategy();
DistributionPackage distributionPackage = mock(DistributionPackage.class);
DistributionPackage distributionPackage = mock(SharedDistributionPackage.class);
DistributionQueueProvider queueProvider = mock(DistributionQueueProvider.class);
DistributionQueue queue = mock(DistributionQueue.class);
when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
Expand All @@ -64,7 +65,7 @@ public void testPackageAdditionWithSucceedingItemDelivery() throws Exception {
@Test
public void testPackageAdditionWithFailingItemDelivery() throws Exception {
SingleQueueDispatchingStrategy singleQueueDistributionStrategy = new SingleQueueDispatchingStrategy();
DistributionPackage distributionPackage = mock(DistributionPackage.class);
DistributionPackage distributionPackage = mock(SharedDistributionPackage.class);
DistributionQueueProvider queueProvider = mock(DistributionQueueProvider.class);
DistributionQueue queue = mock(DistributionQueue.class);
DistributionQueueItem queueItem = new DistributionQueueItem("packageId", new HashMap<String, Object>());
Expand All @@ -82,7 +83,7 @@ public void testPackageAdditionWithFailingItemDelivery() throws Exception {
@Test
public void testPackageAdditionWithNotNullItemStateFromTheQueue() throws Exception {
SingleQueueDispatchingStrategy singleQueueDistributionStrategy = new SingleQueueDispatchingStrategy();
DistributionPackage distributionPackage = mock(DistributionPackage.class);
DistributionPackage distributionPackage = mock(SharedDistributionPackage.class);
DistributionQueueProvider queueProvider = mock(DistributionQueueProvider.class);
DistributionQueue queue = mock(DistributionQueue.class);
when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
Expand Down