Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to use EntityManagerFactory from JNDI. #693

Open
wants to merge 2 commits into
base: 1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bpm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@
<groupId>org.jbpm</groupId>
<artifactId>jbpm-flow-builder</artifactId>
</dependency>
<dependency>
<groupId>org.jbpm</groupId>
<artifactId>jbpm-human-task-audit</artifactId>
</dependency>
<dependency>
<groupId>org.jbpm</groupId>
<artifactId>jbpm-human-task-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Map;
import java.util.Properties;

import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.transaction.TransactionManager;
Expand All @@ -34,7 +36,7 @@
import org.jbpm.persistence.JpaProcessPersistenceContextManager;
import org.jbpm.persistence.processinstance.JPAProcessInstanceManagerFactory;
import org.jbpm.persistence.processinstance.JPASignalManagerFactory;
import org.jbpm.shared.services.impl.JbpmJTATransactionManager;
import org.jbpm.services.task.persistence.JPATaskPersistenceContextManager;
import org.jbpm.workflow.instance.impl.WorkflowProcessInstanceImpl;
import org.kie.api.runtime.EnvironmentName;
import org.kie.api.runtime.process.ProcessInstance;
Expand All @@ -43,6 +45,7 @@
import org.kie.internal.process.CorrelationAwareProcessRuntime;
import org.kie.internal.process.CorrelationKey;
import org.kie.internal.process.CorrelationKeyFactory;
import org.kie.internal.runtime.manager.RuntimeManagerRegistry;
import org.kie.internal.task.api.UserGroupCallback;
import org.switchyard.Context;
import org.switchyard.Exchange;
Expand All @@ -65,10 +68,12 @@
import org.switchyard.component.bpm.util.WorkItemHandlers;
import org.switchyard.component.common.knowledge.exchange.KnowledgeExchangeHandler;
import org.switchyard.component.common.knowledge.exchange.KnowledgeOperation;
import org.switchyard.component.common.knowledge.session.KnowledgeDisposal;
import org.switchyard.component.common.knowledge.session.KnowledgeSession;
import org.switchyard.component.common.knowledge.util.Disposals;
import org.switchyard.component.common.knowledge.util.Environments;
import org.switchyard.component.common.knowledge.util.Listeners;
import org.switchyard.config.model.property.PropertyModel;

/**
* A "bpm" implementation of a KnowledgeExchangeHandler.
Expand All @@ -78,14 +83,15 @@
public class BPMExchangeHandler extends KnowledgeExchangeHandler<BPMComponentImplementationModel> {

private static final KnowledgeOperation DEFAULT_OPERATION = new KnowledgeOperation(BPMOperationType.START_PROCESS);
private static final String PERSISTENCE_JNDI_NAME = "persistenceJndiName";

private final boolean _persistent;
private final String _processId;
private String _persistenceJndiName;
private BPMProcessEventListener _processEventListener;
private UserGroupCallback _userGroupCallback;
private CorrelationKeyFactory _correlationKeyFactory;
private EntityManagerFactory _processEntityManagerFactory;
private EntityManagerFactory _taskEntityManagerFactory;
private EntityManagerFactory _entityManagerFactory;
private BPMTaskService _taskService;

/**
Expand All @@ -98,6 +104,10 @@ public BPMExchangeHandler(BPMComponentImplementationModel model, ServiceDomain s
super(model, serviceDomain, serviceName);
_persistent = model.isPersistent();
_processId = model.getProcessId();
if (model.getProperties() != null) {
PropertyModel persistenceJndiNameProp = model.getProperties().getProperty(PERSISTENCE_JNDI_NAME);
_persistenceJndiName = persistenceJndiNameProp != null ? persistenceJndiNameProp.getValue() : null;
}
}

/**
Expand All @@ -110,11 +120,24 @@ protected void doStart() {
_userGroupCallback = UserGroupCallbacks.newUserGroupCallback(getModel(), getLoader());
_correlationKeyFactory = KieInternalServices.Factory.get().newCorrelationKeyFactory();
if (_persistent) {
_processEntityManagerFactory = Persistence.createEntityManagerFactory("org.jbpm.persistence.jpa");
_taskEntityManagerFactory = Persistence.createEntityManagerFactory("org.jbpm.services.task");
_taskService = BPMTaskService.Factory.newTaskService(Environments.getEnvironment(super.getEnvironmentOverrides()), _taskEntityManagerFactory, new JbpmJTATransactionManager(), _userGroupCallback, getLoader());
BPMTaskServiceRegistry.putTaskService(getServiceDomain().getName(), getServiceName(), _taskService);
if (_persistenceJndiName != null) {
try {
InitialContext cntx = new InitialContext();
_entityManagerFactory = (EntityManagerFactory) cntx.lookup(_persistenceJndiName);
} catch (NamingException e) {
_persistenceJndiName = null;
_entityManagerFactory = createDefaultEntityManagerFactory();
}
} else {
_entityManagerFactory = createDefaultEntityManagerFactory();
}
}
_taskService = BPMTaskService.Factory.newTaskService(Environments.getEnvironment(getEnvironmentOverrides()), _entityManagerFactory, _userGroupCallback, getLoader());
BPMTaskServiceRegistry.putTaskService(getServiceDomain().getName(), getServiceName(), _taskService);
}

private static EntityManagerFactory createDefaultEntityManagerFactory() {
return Persistence.createEntityManagerFactory("org.jbpm.persistence.jpa");
}

/**
Expand All @@ -126,13 +149,11 @@ protected void doStop() {
_processEventListener = null;
_userGroupCallback = null;
_correlationKeyFactory = null;
if (_processEntityManagerFactory != null) {
Disposals.newDisposal(_processEntityManagerFactory).dispose();
_processEntityManagerFactory = null;
}
if (_taskEntityManagerFactory != null) {
Disposals.newDisposal(_taskEntityManagerFactory).dispose();
_taskEntityManagerFactory = null;
if (_entityManagerFactory != null) {
if (_persistenceJndiName == null) {
Disposals.newDisposal(_entityManagerFactory).dispose();
}
_entityManagerFactory = null;
}
_taskService = null;
BPMTaskServiceRegistry.removeTaskService(getServiceDomain().getName(), getServiceName());
Expand Down Expand Up @@ -160,10 +181,11 @@ protected Map<String, Object> getEnvironmentOverrides() {
if (_persistent) {
UserTransaction ut = AS7TransactionHelper.getUserTransaction();
TransactionManager tm = AS7TransactionHelper.getTransactionManager();
env.put(EnvironmentName.ENTITY_MANAGER_FACTORY, _processEntityManagerFactory);
env.put(EnvironmentName.ENTITY_MANAGER_FACTORY, _entityManagerFactory);
env.put(EnvironmentName.TRANSACTION, ut);
env.put(EnvironmentName.TRANSACTION_MANAGER, new JtaTransactionManager(ut, null, tm));
env.put(EnvironmentName.PERSISTENCE_CONTEXT_MANAGER, new JpaProcessPersistenceContextManager(Environments.getEnvironment(env)));
env.put(EnvironmentName.TASK_PERSISTENCE_CONTEXT_MANAGER, new JPATaskPersistenceContextManager(Environments.getEnvironment(env)));
}
return env;
}
Expand Down Expand Up @@ -304,8 +326,20 @@ private KnowledgeSession getBPMSession(Exchange exchange, Message message) {
}
Listeners.registerListener(_processEventListener, session.getStateful());
// TODO: the use of BPMRuntimeEnvironment/Manager should be removed after SWITCHYARD-1584
BPMRuntimeEnvironment runtimeEnvironment = new BPMRuntimeEnvironment(session.getStateful(), _processEntityManagerFactory, _userGroupCallback, getLoader());
BPMRuntimeManager runtimeManager = new BPMRuntimeManager(session.getStateful(), _taskService, getDeploymentId(), runtimeEnvironment);
// TODO: along with the above, use the RuntimeEnvironmentBuilderFactory
final String deploymentId = getDeploymentId();
BPMRuntimeEnvironment runtimeEnvironment = new BPMRuntimeEnvironment(session.getStateful(), _entityManagerFactory, _userGroupCallback, getLoader());
BPMRuntimeManager runtimeManager = new BPMRuntimeManager(session.getStateful(), _taskService, deploymentId, runtimeEnvironment);
final RuntimeManagerRegistry runtimeManagerRegistry = RuntimeManagerRegistry.get();
if (!runtimeManagerRegistry.isRegistered(deploymentId)) {
runtimeManagerRegistry.register(runtimeManager);
session.addDisposals(new KnowledgeDisposal() {
@Override
public void dispose() {
runtimeManagerRegistry.remove(deploymentId);
}
});
}
WorkItemHandlers.registerWorkItemHandlers(getModel(), getLoader(), session.getStateful(), runtimeManager, getServiceDomain());
return session;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
import javax.persistence.EntityManagerFactory;

import org.jbpm.services.task.HumanTaskServiceFactory;
import org.jbpm.shared.services.api.JbpmServicesTransactionManager;
import org.jbpm.shared.services.impl.events.JbpmServicesEventListener;
import org.jbpm.services.task.lifecycle.listeners.TaskLifeCycleEventListener;
import org.kie.api.runtime.Environment;
import org.kie.api.task.model.Task;
// SWITCHYARD-1755: internal api usage still required (until jbpm services usage is resolved)
import org.kie.internal.task.api.ContentMarshallerContext;
import org.kie.internal.task.api.EventService;
import org.kie.internal.task.api.InternalTaskService;
import org.kie.internal.task.api.UserGroupCallback;
import org.kie.internal.task.api.model.NotificationEvent;
import org.switchyard.component.bpm.transaction.AS7TransactionHelper;
import org.switchyard.component.common.knowledge.util.Environments;

Expand All @@ -38,7 +35,7 @@
*
* @author David Ward &lt;<a href="mailto:[email protected]">[email protected]</a>&gt; &copy; 2013 Red Hat Inc.
*/
public interface BPMTaskService extends InternalTaskService, EventService<JbpmServicesEventListener<NotificationEvent>,JbpmServicesEventListener<Task>> {
public interface BPMTaskService extends InternalTaskService, EventService<TaskLifeCycleEventListener> {

/**
* The BPM task service factory.
Expand All @@ -49,24 +46,27 @@ public static final class Factory {
* Creates a new BPM task service.
* @param environment the environment
* @param entityManagerFactory the entity manager factory
* @param jbpmServicesTransactionManager the jbpm services transaction manager
* @param userGroupCallback the user group callback
* @param loader the classloader
* @return the bpm task service
*/
public static final BPMTaskService newTaskService(
Environment environment,
EntityManagerFactory entityManagerFactory,
JbpmServicesTransactionManager jbpmServicesTransactionManager,
UserGroupCallback userGroupCallback,
ClassLoader loader) {
InternalTaskService internalTaskService = (InternalTaskService)HumanTaskServiceFactory.newTaskServiceConfigurator()
.entityManagerFactory(entityManagerFactory)
.transactionManager(jbpmServicesTransactionManager)
.userGroupCallback(userGroupCallback)
.getTaskService();
String deploymentId = (String)environment.get(Environments.DEPLOYMENT_ID);
internalTaskService.addMarshallerContext(deploymentId, new ContentMarshallerContext(environment, loader));
InternalTaskService internalTaskService;
if (entityManagerFactory != null) {
internalTaskService = (InternalTaskService)HumanTaskServiceFactory.newTaskServiceConfigurator()
.environment(environment)
.entityManagerFactory(entityManagerFactory)
.userGroupCallback(userGroupCallback)
.getTaskService();
String deploymentId = (String)environment.get(Environments.DEPLOYMENT_ID);
internalTaskService.addMarshallerContext(deploymentId, new ContentMarshallerContext(environment, loader));
} else {
internalTaskService = null;
}
InvocationHandler invocationHandler = new TaskServiceInvocationHandler(internalTaskService);
return (BPMTaskService)Proxy.newProxyInstance(BPMTaskService.class.getClassLoader(), new Class[]{BPMTaskService.class}, invocationHandler);
}
Expand All @@ -81,17 +81,23 @@ private TaskServiceInvocationHandler(InternalTaskService internalTaskService) {

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object ret;
AS7TransactionHelper utx = new AS7TransactionHelper(true);
try {
utx.begin();
ret = method.invoke(_internalTaskService, args);
utx.commit();
} catch (Throwable t) {
utx.rollback();
throw t;
if (_internalTaskService == null) {
return null;
} else if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(_internalTaskService, args);
} else {
Object ret;
AS7TransactionHelper utx = new AS7TransactionHelper(true);
try {
utx.begin();
ret = method.invoke(_internalTaskService, args);
utx.commit();
} catch (Throwable t) {
utx.rollback();
throw t;
}
return ret;
}
return ret;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,18 @@ public static void registerWorkItemHandlers(BPMComponentImplementationModel mode
if (!registeredNames.contains(HUMAN_TASK) && runtimeManager != null) {
RuntimeEngine runtimeEngine = runtimeManager.getRuntimeEngine();
ExternalTaskEventListener listener = new ExternalTaskEventListener();
listener.setRuntimeManager(runtimeManager);
LocalHTWorkItemHandler htwih = new LocalHTWorkItemHandler();
htwih.setRuntimeManager(runtimeManager);
// NOTE: Cannot remove next two blocks for SWITCHYARD-1755 yet...
if (runtimeEngine.getTaskService() instanceof EventService) {
((EventService)runtimeEngine.getTaskService()).registerTaskLifecycleEventListener(listener);
((EventService)runtimeEngine.getTaskService()).registerTaskEventListener(listener);
}
if (processRuntime instanceof Disposable) {
((Disposable)processRuntime).addDisposeListener(new DisposeListener() {
@Override
public void onDispose(RuntimeEngine re) {
if (re.getTaskService() instanceof EventService) {
((EventService)re.getTaskService()).clearTaskLifecycleEventListeners();
((EventService)re.getTaskService()).clearTasknotificationEventListeners();
((EventService)re.getTaskService()).clearTaskEventListeners();
}
}
});
Expand Down