Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/zeppelin-site.xml.template
Original file line number Diff line number Diff line change
Expand Up @@ -824,4 +824,10 @@
<description>fields to be excluded from being saved in note files, with Paragraph prefix mean the fields in Paragraph, e.g. Paragraph.results</description>
</property>

<property>
<name>zeppelin.eventbus.enabled</name>
<value>false</value>
<description>Enables the new event-driven architecture using an in-process EventBus</description>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,8 @@ public boolean isPrometheusMetricEnabled() {
return getBoolean(ConfVars.ZEPPELIN_METRIC_ENABLE_PROMETHEUS);
}

public boolean isEventBusEnabled() { return getBoolean(ConfVars.ZEPPELIN_EVENTBUS_ENABLED); }

public DEFAULT_UI getDefaultUi() {
return DEFAULT_UI.valueOf(getString(ConfVars.ZEPPELIN_DEFAULT_UI).toUpperCase());
}
Expand Down Expand Up @@ -1131,7 +1133,8 @@ public enum ConfVars {
ZEPPELIN_SPARK_ONLY_YARN_CLUSTER("zeppelin.spark.only_yarn_cluster", false),
ZEPPELIN_SESSION_CHECK_INTERVAL("zeppelin.session.check_interval", 60 * 10 * 1000),
ZEPPELIN_NOTE_CACHE_THRESHOLD("zeppelin.note.cache.threshold", 50),
ZEPPELIN_NOTE_FILE_EXCLUDE_FIELDS("zeppelin.note.file.exclude.fields", "");
ZEPPELIN_NOTE_FILE_EXCLUDE_FIELDS("zeppelin.note.file.exclude.fields", ""),
ZEPPELIN_EVENTBUS_ENABLED("zeppelin.eventbus.enabled", false);

private String varName;
private Class<?> varClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.conf.ZeppelinConfiguration.DEFAULT_UI;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.event.EventBus;
import org.apache.zeppelin.event.NoOpEventBus;
import org.apache.zeppelin.event.ZeppelinEventBus;
import org.apache.zeppelin.healthcheck.HealthChecks;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.helium.Helium;
Expand Down Expand Up @@ -178,6 +181,11 @@ protected void configure() {
bind(storage).to(ConfigStorage.class);
bindAsContract(PluginManager.class).in(Singleton.class);
bind(GsonNoteParser.class).to(NoteParser.class).in(Singleton.class);
if (zConf.isEventBusEnabled()) {
bind(ZeppelinEventBus.class).to(EventBus.class).in(Singleton.class);
} else {
bind(NoOpEventBus.class).to(EventBus.class).in(Singleton.class);
}
bindAsContract(InterpreterFactory.class).in(Singleton.class);
bindAsContract(NotebookRepoSync.class).to(NotebookRepo.class).in(Singleton.class);
bindAsContract(Helium.class).in(Singleton.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.disposables.Disposable;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.websocket.CloseReason;
Expand All @@ -62,6 +64,9 @@
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.event.EventBus;
import org.apache.zeppelin.event.NoteEvent;
import org.apache.zeppelin.event.NoteRemoveEvent;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.InterpreterGroup;
Expand Down Expand Up @@ -156,6 +161,8 @@ String getKey() {
private AuthorizationService authorizationService;
private Provider<ConfigurationService> configurationServiceProvider;
private Provider<JobManagerService> jobManagerServiceProvider;
private Disposable disposable;


public NotebookServer() {
NotebookServer.self.set(this);
Expand All @@ -167,6 +174,25 @@ public void setZeppelinConfiguration(ZeppelinConfiguration zConf) {
this.zConf = zConf;
}

@Inject
public void registerEventBus(EventBus eventBus, ZeppelinConfiguration zConf) {
if (!zConf.isEventBusEnabled()) {
LOGGER.debug("ZeppelinEventBus is disabled");
return;
}

this.disposable = eventBus.observe(NoteEvent.class)
.subscribe(this::handleNoteEvent);
}

@PreDestroy
public void cleanup() {
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
executorService.shutdown();
}

@Inject
public void setNoteParser(Provider<NoteParser> noteParser) {
this.noteParser = noteParser;
Expand Down Expand Up @@ -1876,19 +1902,12 @@ public void onParagraphRemove(Paragraph p) {

@Override
public void onNoteRemove(Note note, AuthenticationInfo subject) {
try {
broadcastUpdateNoteJobInfo(note, System.currentTimeMillis() - 5000);
} catch (IOException e) {
LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e);
}

try {
getJobManagerService().removeNoteJobInfo(note.getId(), null,
new JobManagerServiceCallback());
} catch (IOException e) {
LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e);
if (zConf.isEventBusEnabled()) {
LOGGER.debug("ZeppelinEventBus is enabed");
return;
}

handleNoteRemove(note);
}

@Override
Expand Down Expand Up @@ -2330,4 +2349,28 @@ public void onFailure(Exception ex, ServiceContext context) throws IOException {
}
}
}

private void handleNoteEvent(NoteEvent event) {
if (event instanceof NoteRemoveEvent) {
Note note = event.getNote();
handleNoteRemove(note);
} else {
LOGGER.warn("Unknown event type: {}", event.getClass().getName());
}
}

private void handleNoteRemove(Note note) {
try {
broadcastUpdateNoteJobInfo(note, System.currentTimeMillis() - 5000);
} catch (IOException e) {
LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e);
}

try {
getJobManagerService().removeNoteJobInfo(note.getId(), null,
new JobManagerServiceCallback());
} catch (IOException e) {
LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public MiniZeppelinServer(String classname, String zeppelinConfiguration) throws
zConf = ZeppelinConfiguration.load(zeppelinConfiguration);
zConf.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(),
zeppelinHome.getAbsoluteFile().toString());
zConf.setProperty(ConfVars.ZEPPELIN_EVENTBUS_ENABLED.getVarName(), "true");
Optional<File> webWar = getWebWar();
Optional<File> webAngularWar = getWebAngularWar();
if (webWar.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.event.EventBus;
import org.apache.zeppelin.event.ZeppelinEventBus;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.interpreter.InterpreterFactory;
Expand Down Expand Up @@ -102,6 +104,7 @@ void setUp(TestInfo testInfo) throws Exception {
ZeppelinConfiguration zConf = ZeppelinConfiguration.load();
zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(),
notebookDir.getAbsolutePath());
zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_EVENTBUS_ENABLED.getVarName(), "true");
// enable cron for testNoteUpdate method
if ("testNoteUpdate()".equals(testInfo.getDisplayName())){
confDir = Files.createTempDirectory("confDir").toAbsolutePath().toFile();
Expand Down Expand Up @@ -138,6 +141,7 @@ void setUp(TestInfo testInfo) throws Exception {
NoteManager noteManager = new NoteManager(notebookRepo, zConf);
AuthorizationService authorizationService =
new AuthorizationService(noteManager, zConf, storage);
EventBus eventBus = new ZeppelinEventBus();
notebook =
new Notebook(
zConf,
Expand All @@ -147,7 +151,7 @@ void setUp(TestInfo testInfo) throws Exception {
mockInterpreterFactory,
mockInterpreterSettingManager,
credentials,
null);
eventBus);
searchService = new LuceneSearch(zConf, notebook);
QuartzSchedulerService schedulerService = new QuartzSchedulerService(zConf, notebook);
notebook.initNotebook();
Expand Down
6 changes: 6 additions & 0 deletions zeppelin-zengine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</dependency>

<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.10</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.event;

import io.reactivex.rxjava3.core.Observable;

public interface EventBus {

void post(Object event);

<T> Observable<T> observe(Class<T> eventType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.event;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.reactivex.rxjava3.core.Observable;
import jakarta.inject.Inject;

public class NoOpEventBus implements EventBus {

private static final Logger LOGGER = LoggerFactory.getLogger(NoOpEventBus.class);

@Inject
public NoOpEventBus() {
LOGGER.info("Starting NoOpEventBus");
}

@Override
public void post(Object event) {
LOGGER.debug("Posting event: {}", event.getClass().getName());
}

@Override
public <T> io.reactivex.rxjava3.core.Observable<T> observe(Class<T> eventType) {
return Observable.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.event;

import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.user.AuthenticationInfo;

public interface NoteEvent {

Note getNote();

AuthenticationInfo getSubject();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.event;

import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.user.AuthenticationInfo;

public class NoteRemoveEvent implements NoteEvent {

private Note note;

private AuthenticationInfo subject;

public NoteRemoveEvent(Note note, AuthenticationInfo subject) {
this.note = note;
this.subject = subject;
}

@Override
public Note getNote() {
return this.note;
}

@Override
public AuthenticationInfo getSubject() {
return subject;
}
}
Loading
Loading