diff --git a/sca/src/main/java/org/switchyard/component/sca/SCAInvoker.java b/sca/src/main/java/org/switchyard/component/sca/SCAInvoker.java index f4900b0d7..e3c077e14 100644 --- a/sca/src/main/java/org/switchyard/component/sca/SCAInvoker.java +++ b/sca/src/main/java/org/switchyard/component/sca/SCAInvoker.java @@ -13,13 +13,16 @@ */ package org.switchyard.component.sca; +import java.io.IOException; + import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.xml.namespace.QName; -import org.jboss.logging.Logger; import org.jboss.jbossts.txbridge.outbound.OutboundBridge; import org.jboss.jbossts.txbridge.outbound.OutboundBridgeManager; +import org.jboss.logging.Logger; +import org.oasis_open.docs.ws_tx.wscoor._2006._06.CoordinationContextType; import org.switchyard.Context; import org.switchyard.Exchange; import org.switchyard.ExchangePattern; @@ -33,37 +36,40 @@ import org.switchyard.config.model.composite.SCABindingModel; import org.switchyard.deploy.BaseServiceHandler; import org.switchyard.label.BehaviorLabel; +import org.switchyard.remote.RemoteEndpoint; +import org.switchyard.remote.RemoteInvoker; import org.switchyard.remote.RemoteMessage; +import org.switchyard.remote.RemoteMessages; import org.switchyard.remote.RemoteRegistry; -import org.switchyard.remote.cluster.ClusteredInvoker; import org.switchyard.remote.cluster.LoadBalanceStrategy; import org.switchyard.remote.cluster.RandomStrategy; import org.switchyard.remote.cluster.RoundRobinStrategy; +import org.switchyard.remote.http.HttpInvoker; import org.switchyard.remote.http.HttpInvokerLabel; import org.switchyard.runtime.event.ExchangeCompletionEvent; import com.arjuna.mw.wst11.TransactionManagerFactory; import com.arjuna.mwlabs.wst11.at.context.TxContextImple; -import org.oasis_open.docs.ws_tx.wscoor._2006._06.CoordinationContextType; - /** * Handles outbound communication to an SCA service endpoint. */ -public class SCAInvoker extends BaseServiceHandler { +public class SCAInvoker extends BaseServiceHandler implements RemoteInvoker { private static Logger _log = Logger.getLogger(SCAInvoker.class); private final SCABindingModel _config; private final String _bindingName; private final String _referenceName; - private ClusteredInvoker _invoker; + private RemoteRegistry _registry; + private LoadBalanceStrategy _loadBalancer; private TransactionContextSerializer _txSerializer = new TransactionContextSerializer(); /** * Create a new SCAInvoker for invoking local endpoints. * @param config binding configuration model */ + // TODO: IMHO this constructor should not exist public SCAInvoker(SCABindingModel config) { _config = config; _bindingName = config.getName(); @@ -77,12 +83,9 @@ public SCAInvoker(SCABindingModel config) { */ public SCAInvoker(SCABindingModel config, RemoteRegistry registry) { this(config); - if (config.isLoadBalanced()) { - LoadBalanceStrategy loadBalancer = createLoadBalancer(config.getLoadBalance()); - _invoker = new ClusteredInvoker(registry, loadBalancer); - } else { - _invoker = new ClusteredInvoker(registry); - } + + _registry = registry; + _loadBalancer = createLoadBalancer(config.getLoadBalance()); } @Override @@ -95,25 +98,31 @@ public void handleMessage(Exchange exchange) throws HandlerException { throw SCAMessages.MESSAGES.referenceBindingNotStarted(_referenceName, _bindingName); } try { - if (_config.isClustered()) { - invokeRemote(exchange); - } else { - invokeLocal(exchange); + // Figure out the QName for the service were invoking + QName serviceName = getTargetServiceName(exchange); + + // Maintain old functionality + if (_config.isPreferLocal()){ + if ((_registry != null) + && (_registry.hasLocalEndpoint(serviceName)) && (_loadBalancer != null)) { + invokeRemote(exchange, serviceName); + } else { + invokeLocal(exchange, serviceName); + } + }else{ + if (_config.isClustered()){ + invokeRemote(exchange, serviceName); + }else{ + invokeLocal(exchange, serviceName); + } } } catch (SwitchYardException syEx) { throw new HandlerException(syEx.getMessage()); } } - // This method exists for test purposes and should not be used at runtime. Initialization - // of the invoker instance occurs in the constructor for SCAInvoker. - void setInvoker(ClusteredInvoker invoker) { - _invoker = invoker; - } - - private void invokeLocal(Exchange exchange) throws HandlerException { - // Figure out the QName for the service were invoking - QName serviceName = getTargetServiceName(exchange); + private void invokeLocal(Exchange exchange, QName serviceName) throws HandlerException { + // Get a handle for the reference and use a copy of the exchange to invoke it ServiceReference ref = exchange.getProvider().getDomain().getServiceReference(serviceName); if (ref == null) { @@ -144,9 +153,7 @@ private void invokeLocal(Exchange exchange) throws HandlerException { } } - private void invokeRemote(Exchange exchange) throws HandlerException { - // Figure out the QName for the service were invoking - QName serviceName = getTargetServiceName(exchange); + private void invokeRemote(Exchange exchange, QName serviceName) throws HandlerException { RemoteMessage request = new RemoteMessage() .setDomain(exchange.getProvider().getDomain().getName()) @@ -157,7 +164,7 @@ private void invokeRemote(Exchange exchange) throws HandlerException { boolean transactionPropagated = bridgeOutgoingTransaction(request); try { - RemoteMessage reply = _invoker.invoke(request); + RemoteMessage reply = this.invoke(request); if (transactionPropagated) { bridgeIncomingTransaction(); } @@ -274,6 +281,8 @@ private HandlerException createHandlerException(Object content) { LoadBalanceStrategy createLoadBalancer(String strategy) { + if (strategy==null) return null; + if (RoundRobinStrategy.class.getSimpleName().equals(strategy)) { return new RoundRobinStrategy(); } else if (RandomStrategy.class.getSimpleName().equals(strategy)) { @@ -290,4 +299,13 @@ LoadBalanceStrategy createLoadBalancer(String strategy) { } } } + + @Override + public RemoteMessage invoke(RemoteMessage request) throws IOException { + RemoteEndpoint ep = _loadBalancer.selectEndpoint(request.getService()); + if (ep == null) { + throw RemoteMessages.MESSAGES.noRemoteEndpointFound(request.getService().toString()); + } + return new HttpInvoker(ep.getEndpoint()).invoke(request); + } } diff --git a/sca/src/test/java/org/switchyard/component/sca/SCAInvokerTest.java b/sca/src/test/java/org/switchyard/component/sca/SCAInvokerTest.java index c16bc4579..fbe9efcb8 100644 --- a/sca/src/test/java/org/switchyard/component/sca/SCAInvokerTest.java +++ b/sca/src/test/java/org/switchyard/component/sca/SCAInvokerTest.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.LinkedList; +import java.util.List; import javax.xml.namespace.QName; @@ -324,6 +325,10 @@ public boolean isClustered() { public String getTarget() { return "test-target"; } + @Override + public String getLoadBalance() { + return "RoundRobinStrategy"; + } public CompositeReferenceModel getReference() { return new V1CompositeReferenceModel(); }; @@ -331,16 +336,38 @@ public CompositeReferenceModel getReference() { // Mock the invoker so that we don't need an actual remote endpoint final LinkedList msgs = new LinkedList(); - ClusteredInvoker clInovker = new ClusteredInvoker(null) { + + SCAInvoker scaInvoker = new SCAInvoker(config, new RemoteRegistry() { + @Override + public void removeEndpoint(RemoteEndpoint endpoint) { + } + + @Override + public boolean hasLocalEndpoint(QName serviceName) { + return false; + } + + @Override + public RemoteEndpoint getLocalEndpoint(QName serviceName) { + return null; + } + + @Override + public List getEndpoints(QName serviceName) { + return null; + } + + @Override + public void addEndpoint(RemoteEndpoint endpoint) { + } + }){ @Override public RemoteMessage invoke(RemoteMessage request) throws IOException { msgs.push(request); return null; } }; - - SCAInvoker scaInvoker = new SCAInvoker(config); - scaInvoker.setInvoker(clInovker); +// scaInvoker.setInvoker(clInovker); scaInvoker.start(); // Verify that exchange is mapped to remote message correctly