diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index d5e54b91f16..62736313651 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -824,4 +824,10 @@ fields to be excluded from being saved in note files, with Paragraph prefix mean the fields in Paragraph, e.g. Paragraph.results + + zeppelin.eventbus.enabled + false + Enables the new event-driven architecture using an in-process EventBus + + diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 3b9ebee0bad..7f12fe69785 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -895,6 +895,8 @@ public boolean isPrometheusMetricEnabled() { return getBoolean(ConfVars.ZEPPELIN_METRIC_ENABLE_PROMETHEUS); } + public boolean isEventBusEnabled() { return getBoolean(ConfVars.ZEPPELIN_EVENTBUS_ENABLED); } + public DEFAULT_UI getDefaultUi() { return DEFAULT_UI.valueOf(getString(ConfVars.ZEPPELIN_DEFAULT_UI).toUpperCase()); } @@ -1131,7 +1133,8 @@ public enum ConfVars { ZEPPELIN_SPARK_ONLY_YARN_CLUSTER("zeppelin.spark.only_yarn_cluster", false), ZEPPELIN_SESSION_CHECK_INTERVAL("zeppelin.session.check_interval", 60 * 10 * 1000), ZEPPELIN_NOTE_CACHE_THRESHOLD("zeppelin.note.cache.threshold", 50), - ZEPPELIN_NOTE_FILE_EXCLUDE_FIELDS("zeppelin.note.file.exclude.fields", ""); + ZEPPELIN_NOTE_FILE_EXCLUDE_FIELDS("zeppelin.note.file.exclude.fields", ""), + ZEPPELIN_EVENTBUS_ENABLED("zeppelin.eventbus.enabled", false); private String varName; private Class varClass; diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index eca789e38b4..34e4572a7e7 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -64,6 +64,9 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.conf.ZeppelinConfiguration.DEFAULT_UI; import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.event.EventBus; +import org.apache.zeppelin.event.NoOpEventBus; +import org.apache.zeppelin.event.ZeppelinEventBus; import org.apache.zeppelin.healthcheck.HealthChecks; import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.helium.Helium; @@ -178,6 +181,11 @@ protected void configure() { bind(storage).to(ConfigStorage.class); bindAsContract(PluginManager.class).in(Singleton.class); bind(GsonNoteParser.class).to(NoteParser.class).in(Singleton.class); + if (zConf.isEventBusEnabled()) { + bind(ZeppelinEventBus.class).to(EventBus.class).in(Singleton.class); + } else { + bind(NoOpEventBus.class).to(EventBus.class).in(Singleton.class); + } bindAsContract(InterpreterFactory.class).in(Singleton.class); bindAsContract(NotebookRepoSync.class).to(NotebookRepo.class).in(Singleton.class); bindAsContract(Helium.class).in(Singleton.class); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index d0d0af683c0..3b0e7939e0c 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -40,6 +40,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.rxjava3.disposables.Disposable; +import jakarta.annotation.PreDestroy; import jakarta.inject.Inject; import jakarta.inject.Provider; import jakarta.websocket.CloseReason; @@ -62,6 +64,9 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.event.EventBus; +import org.apache.zeppelin.event.NoteEvent; +import org.apache.zeppelin.event.NoteRemoveEvent; import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.helium.HeliumPackage; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -156,6 +161,8 @@ String getKey() { private AuthorizationService authorizationService; private Provider configurationServiceProvider; private Provider jobManagerServiceProvider; + private Disposable disposable; + public NotebookServer() { NotebookServer.self.set(this); @@ -167,6 +174,25 @@ public void setZeppelinConfiguration(ZeppelinConfiguration zConf) { this.zConf = zConf; } + @Inject + public void registerEventBus(EventBus eventBus, ZeppelinConfiguration zConf) { + if (!zConf.isEventBusEnabled()) { + LOGGER.debug("ZeppelinEventBus is disabled"); + return; + } + + this.disposable = eventBus.observe(NoteEvent.class) + .subscribe(this::handleNoteEvent); + } + + @PreDestroy + public void cleanup() { + if (disposable != null && !disposable.isDisposed()) { + disposable.dispose(); + } + executorService.shutdown(); + } + @Inject public void setNoteParser(Provider noteParser) { this.noteParser = noteParser; @@ -1876,19 +1902,12 @@ public void onParagraphRemove(Paragraph p) { @Override public void onNoteRemove(Note note, AuthenticationInfo subject) { - try { - broadcastUpdateNoteJobInfo(note, System.currentTimeMillis() - 5000); - } catch (IOException e) { - LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e); - } - - try { - getJobManagerService().removeNoteJobInfo(note.getId(), null, - new JobManagerServiceCallback()); - } catch (IOException e) { - LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e); + if (zConf.isEventBusEnabled()) { + LOGGER.debug("ZeppelinEventBus is enabed"); + return; } + handleNoteRemove(note); } @Override @@ -2330,4 +2349,28 @@ public void onFailure(Exception ex, ServiceContext context) throws IOException { } } } + + private void handleNoteEvent(NoteEvent event) { + if (event instanceof NoteRemoveEvent) { + Note note = event.getNote(); + handleNoteRemove(note); + } else { + LOGGER.warn("Unknown event type: {}", event.getClass().getName()); + } + } + + private void handleNoteRemove(Note note) { + try { + broadcastUpdateNoteJobInfo(note, System.currentTimeMillis() - 5000); + } catch (IOException e) { + LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e); + } + + try { + getJobManagerService().removeNoteJobInfo(note.getId(), null, + new JobManagerServiceCallback()); + } catch (IOException e) { + LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e); + } + } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/MiniZeppelinServer.java b/zeppelin-server/src/test/java/org/apache/zeppelin/MiniZeppelinServer.java index 0430ed9fc1c..9e73a72525b 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/MiniZeppelinServer.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/MiniZeppelinServer.java @@ -89,6 +89,7 @@ public MiniZeppelinServer(String classname, String zeppelinConfiguration) throws zConf = ZeppelinConfiguration.load(zeppelinConfiguration); zConf.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinHome.getAbsoluteFile().toString()); + zConf.setProperty(ConfVars.ZEPPELIN_EVENTBUS_ENABLED.getVarName(), "true"); Optional webWar = getWebWar(); Optional webAngularWar = getWebAngularWar(); if (webWar.isPresent()) { diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java index 152d0856688..2764832c122 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java @@ -46,6 +46,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.event.EventBus; +import org.apache.zeppelin.event.ZeppelinEventBus; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.Interpreter.FormType; import org.apache.zeppelin.interpreter.InterpreterFactory; @@ -102,6 +104,7 @@ void setUp(TestInfo testInfo) throws Exception { ZeppelinConfiguration zConf = ZeppelinConfiguration.load(); zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); + zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_EVENTBUS_ENABLED.getVarName(), "true"); // enable cron for testNoteUpdate method if ("testNoteUpdate()".equals(testInfo.getDisplayName())){ confDir = Files.createTempDirectory("confDir").toAbsolutePath().toFile(); @@ -138,6 +141,7 @@ void setUp(TestInfo testInfo) throws Exception { NoteManager noteManager = new NoteManager(notebookRepo, zConf); AuthorizationService authorizationService = new AuthorizationService(noteManager, zConf, storage); + EventBus eventBus = new ZeppelinEventBus(); notebook = new Notebook( zConf, @@ -147,7 +151,7 @@ void setUp(TestInfo testInfo) throws Exception { mockInterpreterFactory, mockInterpreterSettingManager, credentials, - null); + eventBus); searchService = new LuceneSearch(zConf, notebook); QuartzSchedulerService schedulerService = new QuartzSchedulerService(zConf, notebook); notebook.initNotebook(); diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index 288f70051d3..78d7da877a3 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -291,6 +291,12 @@ org.apache.hadoop hadoop-client-runtime + + + io.reactivex.rxjava3 + rxjava + 3.1.10 + diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/EventBus.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/EventBus.java new file mode 100644 index 00000000000..ccb1a3952f9 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/EventBus.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.event; + +import io.reactivex.rxjava3.core.Observable; + +public interface EventBus { + + void post(Object event); + + Observable observe(Class eventType); +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/NoOpEventBus.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/NoOpEventBus.java new file mode 100644 index 00000000000..78a4198e176 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/NoOpEventBus.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.event; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.reactivex.rxjava3.core.Observable; +import jakarta.inject.Inject; + +public class NoOpEventBus implements EventBus { + + private static final Logger LOGGER = LoggerFactory.getLogger(NoOpEventBus.class); + + @Inject + public NoOpEventBus() { + LOGGER.info("Starting NoOpEventBus"); + } + + @Override + public void post(Object event) { + LOGGER.debug("Posting event: {}", event.getClass().getName()); + } + + @Override + public io.reactivex.rxjava3.core.Observable observe(Class eventType) { + return Observable.empty(); + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/NoteEvent.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/NoteEvent.java new file mode 100644 index 00000000000..2601d13a91f --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/NoteEvent.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.event; + +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.user.AuthenticationInfo; + +public interface NoteEvent { + + Note getNote(); + + AuthenticationInfo getSubject(); +} \ No newline at end of file diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/NoteRemoveEvent.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/NoteRemoveEvent.java new file mode 100644 index 00000000000..171cc7b8ca9 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/NoteRemoveEvent.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.event; + +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.user.AuthenticationInfo; + +public class NoteRemoveEvent implements NoteEvent { + + private Note note; + + private AuthenticationInfo subject; + + public NoteRemoveEvent(Note note, AuthenticationInfo subject) { + this.note = note; + this.subject = subject; + } + + @Override + public Note getNote() { + return this.note; + } + + @Override + public AuthenticationInfo getSubject() { + return subject; + } +} \ No newline at end of file diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/ZeppelinEventBus.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/ZeppelinEventBus.java new file mode 100644 index 00000000000..782705cac8f --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/ZeppelinEventBus.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.event; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.subjects.PublishSubject; +import io.reactivex.rxjava3.subjects.Subject; +import jakarta.inject.Inject; + +public class ZeppelinEventBus implements EventBus { + + private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinEventBus.class); + + private final Subject eventBus; + + @Inject + public ZeppelinEventBus() { + LOGGER.info("Starting ZeppelinEventBus"); + + eventBus = PublishSubject.create(); + } + + @Override + public void post(Object event) { + LOGGER.debug("Posting event: {}", event.getClass().getName()); + + eventBus.onNext(event); + } + + @Override + public Observable observe(Class eventType) { + LOGGER.debug("Observing event: {}", eventType.getName()); + + return eventBus.ofType(eventType); + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 713cc793223..57551083f13 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -44,6 +44,8 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.event.EventBus; +import org.apache.zeppelin.event.NoteRemoveEvent; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -65,6 +67,7 @@ import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.PreDestroy; /** * High level api of Notebook related operations, such as create, move & delete note/folder. @@ -86,11 +89,13 @@ public class Notebook { private Credentials credentials; private final List> initConsumers; private ExecutorService initExecutor; + private EventBus eventBus; /** * Main constructor \w manual Dependency Injection * * @throws IOException + * * @throws SchedulerException */ public Notebook( @@ -100,7 +105,8 @@ public Notebook( NoteManager noteManager, InterpreterFactory replFactory, InterpreterSettingManager interpreterSettingManager, - Credentials credentials) + Credentials credentials, + EventBus eventBus) { this.zConf = zConf; this.authorizationService = authorizationService; @@ -111,6 +117,7 @@ public Notebook( // TODO(zjffdu) cycle refer, not a good solution this.interpreterSettingManager.setNotebook(this); this.credentials = credentials; + this.eventBus = eventBus; addNotebookEventListener(this.interpreterSettingManager); initConsumers = new LinkedList<>(); } @@ -220,7 +227,9 @@ public Notebook( InterpreterFactory replFactory, InterpreterSettingManager interpreterSettingManager, Credentials credentials, - NoteEventListener noteEventListener) + NoteEventListener noteEventListener, + EventBus eventBus + ) throws IOException { this( zConf, @@ -229,7 +238,8 @@ public Notebook( noteManager, replFactory, interpreterSettingManager, - credentials); + credentials, + eventBus); if (null != noteEventListener) { addNotebookEventListener(noteEventListener); } @@ -425,14 +435,16 @@ public String cloneNote(String sourceNoteId, String revisionId, String newNotePa }); } - private void removeNote(Note note, AuthenticationInfo subject) throws IOException { - LOGGER.info("Remove note: {}", note.getId()); - // Set Remove to true to cancel saving this note - note.setRemoved(true); - noteManager.removeNote(note.getId(), subject); - authorizationService.removeNoteAuth(note.getId()); - fireNoteRemoveEvent(note, subject); - } + private void removeNote(Note note, AuthenticationInfo subject) throws IOException { + LOGGER.info("Remove note: {}", note.getId()); + // Set Remove to true to cancel saving this note + note.setRemoved(true); + noteManager.removeNote(note.getId(), subject); + authorizationService.removeNoteAuth(note.getId()); + + fireNoteRemoveEvent(note, subject); + eventBus.post(new NoteRemoveEvent(note, subject)); + } public void removeCorruptedNote(String noteId, AuthenticationInfo subject) throws IOException { LOGGER.info("Remove corrupted note: {}", noteId); @@ -831,6 +843,7 @@ private void fireNoteUpdateEvent(Note note, AuthenticationInfo subject) { } } + private void fireNoteRemoveEvent(Note note, AuthenticationInfo subject) { for (NoteEventListener listener : noteEventListeners) { listener.onNoteRemove(note, subject); @@ -865,4 +878,4 @@ public interface NoteProcessor { T process(Note note) throws IOException; } -} +} \ No newline at end of file diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/event/RxJavaTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/event/RxJavaTest.java new file mode 100644 index 00000000000..d507c872a8f --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/event/RxJavaTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.event; + +import io.reactivex.rxjava3.core.Observable; +import org.junit.jupiter.api.Test; + +class RxJavaTest { + + @Test + void testObservable() { + Observable observable = Observable.just("Hello", "RxJava", "Test"); + + observable.test() + .assertValues("Hello", "RxJava", "Test") + .assertComplete() + .assertNoErrors(); + } +} diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/event/ZeppelinEventBusTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/event/ZeppelinEventBusTest.java new file mode 100644 index 00000000000..1a07d115339 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/event/ZeppelinEventBusTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.event; + +import io.reactivex.rxjava3.disposables.Disposable; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class MockEvent { + String payload; + + public MockEvent(String payload) { + this.payload = payload; + } +} + +class Publisher { + private final ZeppelinEventBus eventBus; + + public Publisher(ZeppelinEventBus eventBus) { + this.eventBus = eventBus; + } + + public void createNote(String noteId) { + eventBus.post(new MockEvent(noteId)); + } +} + +class Subscriber { + List collection = new ArrayList<>(); + + Disposable disposable; + + public Subscriber(ZeppelinEventBus eventBus) { + this.disposable = eventBus.observe(MockEvent.class) + .subscribe(event -> { + String payload = event.payload; + collection.add(payload); + System.out.println("EventSubscriber: event received, payload: " + payload); + }); + } + + public void stopListening() { + if (disposable != null && !disposable.isDisposed()) { + disposable.dispose(); + } + } +} + +class ZeppelinEventBusTest { + @Test + void testEventFlowFromPublisherToSubscriber() throws InterruptedException { + // Given + var bus = new ZeppelinEventBus(); + + var publisher = new Publisher(bus); + var subscriber = new Subscriber(bus); + + // When + String payload = "data"; + publisher.createNote(payload); + + Thread.sleep(100); + + // Then + List received = subscriber.collection; + + assertEquals(1, received.size()); + assertEquals(payload, received.get(0)); + assertTrue(received.contains(payload)); + + // Cleanup + subscriber.stopListening(); + } +} \ No newline at end of file diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java index 2f356281ab7..d66662ebfad 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java @@ -70,7 +70,8 @@ public void setUp() throws Exception { new NoteManager(notebookRepo, ZeppelinConfiguration.load()), interpreterFactory, interpreterSettingManager, - new Credentials()); + new Credentials(), + eventBus); heliumAppFactory = new HeliumApplicationFactory(notebook, null); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java index 0bfd19014b7..8a940222506 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java @@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.event.ZeppelinEventBus; import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.notebook.AuthorizationService; @@ -65,6 +66,7 @@ public abstract class AbstractInterpreterTest { protected ZeppelinConfiguration zConf; protected ConfigStorage storage; protected PluginManager pluginManager; + protected ZeppelinEventBus eventBus; @BeforeEach public void setUp() throws Exception { @@ -89,6 +91,7 @@ public void setUp() throws Exception { FileUtils.copyDirectory(new File("src/test/resources/conf"), confDir); zConf = ZeppelinConfiguration.load(); + zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_EVENTBUS_ENABLED.getVarName(), "true"); zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinHome.getAbsolutePath()); zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), @@ -99,13 +102,14 @@ public void setUp() throws Exception { notebookDir.getAbsolutePath()); zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT.getVarName(), "test"); - + zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_EVENTBUS_ENABLED.getVarName(), "true"); NotebookRepo notebookRepo = new InMemoryNotebookRepo(); NoteManager noteManager = new NoteManager(notebookRepo, zConf); noteParser = new GsonNoteParser(zConf); storage = ConfigStorage.createConfigStorage(zConf); pluginManager = new PluginManager(zConf); + eventBus = new ZeppelinEventBus(); AuthorizationService authorizationService = new AuthorizationService(noteManager, zConf, storage); @@ -114,7 +118,8 @@ public void setUp() throws Exception { mock(ApplicationEventListener.class), storage, pluginManager); interpreterFactory = new InterpreterFactory(interpreterSettingManager); Credentials credentials = new Credentials(zConf, storage); - notebook = new Notebook(zConf, authorizationService, notebookRepo, noteManager, interpreterFactory, interpreterSettingManager, credentials); + ZeppelinEventBus eventBus = new ZeppelinEventBus(); + notebook = new Notebook(zConf, authorizationService, notebookRepo, noteManager, interpreterFactory, interpreterSettingManager, credentials, eventBus); interpreterSettingManager.setNotebook(notebook); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index c0bdcc116de..1d0e3aeb9b1 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -21,6 +21,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.event.ZeppelinEventBus; import org.apache.zeppelin.interpreter.AbstractInterpreterTest; import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -109,7 +110,8 @@ public void setUp() throws Exception { authorizationService = new AuthorizationService(noteManager, zConf, storage); credentials = new Credentials(zConf, storage); - notebook = new Notebook(zConf, authorizationService, notebookRepo, noteManager, interpreterFactory, interpreterSettingManager, credentials, null); + ZeppelinEventBus eventBus = new ZeppelinEventBus(); + notebook = new Notebook(zConf, authorizationService, notebookRepo, noteManager, interpreterFactory, interpreterSettingManager, credentials, eventBus); notebook.setParagraphJobListener(this); schedulerService = new QuartzSchedulerService(zConf, notebook); notebook.initNotebook(); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java index 539a160dcbb..d1aa7fb13ba 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java @@ -31,6 +31,8 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.event.EventBus; +import org.apache.zeppelin.event.ZeppelinEventBus; import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterSettingManager; @@ -68,6 +70,7 @@ class NotebookRepoSyncTest { private InterpreterFactory factory; private InterpreterSettingManager interpreterSettingManager; private Credentials credentials; + private EventBus eventBus; private AuthenticationInfo anonymous; private NoteManager noteManager; private AuthorizationService authorizationService; @@ -88,6 +91,7 @@ public void setUp() throws Exception { zConf = ZeppelinConfiguration.load(); noteParser = new GsonNoteParser(zConf); storage = ConfigStorage.createConfigStorage(zConf); + zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_EVENTBUS_ENABLED.getVarName(), "true"); zConf.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinHome.getAbsolutePath()); zConf.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath()); zConf.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.VFSNotebookRepo,org.apache.zeppelin.notebook.repo.mock.VFSNotebookRepoMock"); @@ -108,7 +112,8 @@ public void setUp() throws Exception { noteManager = new NoteManager(notebookRepoSync, zConf); authorizationService = new AuthorizationService(noteManager, zConf, storage); credentials = new Credentials(zConf, storage); - notebook = new Notebook(zConf, authorizationService, notebookRepoSync, noteManager, factory, interpreterSettingManager, credentials, null); + eventBus = new ZeppelinEventBus(); + notebook = new Notebook(zConf, authorizationService, notebookRepoSync, noteManager, factory, interpreterSettingManager, credentials, eventBus); anonymous = new AuthenticationInfo("anonymous"); }