Alexander Fedulov

2 posts

Apache Zeppelin Notebooks Export

Apache Zeppelin is a web-based notebook for interactive data analysis. It has functionality similar to Jypyter (former IPython) but what is cool about it is that it allows using Scala instead of Python to utilize Spark interactively. This makes it a very handy tool if you want to quickly test code but do not want to go through the pain of using sbt assembly + ./bin/spark-submit development cycle (btw. check out a very interesting post by my colleague Artur Mkrtchyan about his findings on a hidden Spark REST API as an alternative to spark-submit).

What regards Apache Zeppelin, as of now, unfortunately, it does not support export of notebooks. Chances are you would need this feature to maybe share your notebooks with your colleagues, migrate from one machine to another or even to put your work under source version control.

Luckily under the hood saving and importing new notebooks is quite transparent, you can do this with just a few commands. So, without further ado, let's see how exactly can we do a manual export.

Notebooks are located in folders with random names in the Zeppelin notebook directory.

ls /opt/zeppelin/notebook/  
2A94M5J1Y  2A94M5J1Z  2AZU1YEZE  2B3D826UD  

The whole notebooks' definition is stored in a single file called note.json (including the source code).

ls /opt/zeppelin/notebook/2A94M5J1Y/  
note.json  

In order to export it to another machine, just copy the folder into another Zeppelin installation directory.

There is just one more thing you need to do though. Open the interpreter configuration file:

vim /opt/zeppelin/conf/interpreter.json  

you will see a section at the end called interpreterBindings. Add a new section with the ID of your imported notebook (should be the same as the folder name, otherwise check note.json file) and associate it with existing interpreters IDs.

"interpreterBindings": {
    "2B3D826UD": [
      "2AZN2E1JE",
      ...
      "2B219R99U",
      "2B46SWGGN"
    ],
    "2A94M5J1Y": [   <---- imported notebook
      "2AZN2E1JE",
      ...
      "2B219R99U",
      "2B46SWGGN"
    ]

Now restart zeppelin:

/opt/zeppelin/bin/zeppelin-daemon.sh stop
/opt/zeppelin/bin/zeppelin-daemon.sh start

That's it. You should now be able to access and run your imported notebook!

Dynamic DataSource Routing with Spring @Transactional

It is often desirable to distribute requests between multiple physical instances of SQL databases depending on the semantics of the executed query. In the simplest and most typical scenario, we would want to ensure unhindered write process to the master DB (INSERT/UPDATE) by offloading heavy SELECT queries to the replicas.

In this blog post I will show how to conveniently achieve this by using custom annotations on top of Spring transactional layer.

Spring provides a variation of DataSource, called AbstractRoutingDatasource. It can be used in place of standard DataSource implementations and enables a mechanism to determine which concrete DataSource to use for each operation at runtime. All you need to do is to extend it and to provide an implementation of an abstract determineCurrentLookupKey method. This is the place to implement your custom logic to determine the concrete DataSource. Returned Object serves as a lookup key. It is typically a String or en Enum, used as a qualifier in Spring configuration (details will follow).

package website.fedulov.routing.RoutingDataSource

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

public class RoutingDataSource extends AbstractRoutingDataSource {  
    @Override
    protected Object determineCurrentLookupKey() {
        return DbContextHolder.getDbType();
    }
}

You might be wondering what is that DbContextHolder object and how does it know which DataSource identifier to return? Keep in mind that determineCurrentLookupKey method will be called whenever TransactionsManager requests a connection. It is important to remember that each transaction is "associated" with a separate thread. More precisely, TransactionsManager binds Connection to the current thread. Therefore in order to dispatch different transactions to different target DataSources we have to make sure that every thread can reliably identify which DataSource is destined for it to be used. This makes it natural to utilize ThreadLocal variables for binding specific DataSource to a Thread and hence to a Transaction. This is how it is done:

public enum DbType {  
   MASTER,
   REPLICA1,
}

public class DbContextHolder {

   private static final ThreadLocal<DbType> contextHolder = new ThreadLocal<DbType>();

   public static void setDbType(DbType dbType) {
       if(dbType == null){
           throw new NullPointerException();
       }
      contextHolder.set(dbType);
   }

   public static DbType getDbType() {
      return (DbType) contextHolder.get();
   }

   public static void clearDbType() {
      contextHolder.remove();
   }
}

As you see, you can also use an enum as the key and Spring will take care of resolving it correctly based on the name. Associated DataSource configuration and keys might look like this:

   ....
<bean id="dataSource" class="website.fedulov.routing.RoutingDataSource">  
 <property name="targetDataSources">
   <map key-type="com.sabienzia.routing.DbType">
     <entry key="MASTER" value-ref="dataSourceMaster"/>
     <entry key="REPLICA1" value-ref="dataSourceReplica"/>
   </map>
 </property>
 <property name="defaultTargetDataSource" ref="dataSourceMaster"/>
</bean>

<bean id="dataSourceMaster" class="org.apache.commons.dbcp.BasicDataSource">  
  <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
  <property name="url" value="${db.master.url}"/>
  <property name="username" value="${db.username}"/>
  <property name="password" value="${db.password}"/>
</bean>  
<bean id="dataSourceReplica" class="org.apache.commons.dbcp.BasicDataSource">  
  <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
  <property name="url" value="${db.replica.url}"/>
  <property name="username" value="${db.username}"/>
  <property name="password" value="${db.password}"/>
</bean>  

We are almost there. At this point you might find yourself doing something like this:

@Service
public class BookService {

  private final BookRepository bookRepository;
  private final Mapper               mapper;

  @Inject
  public BookService(BookRepository bookRepository, Mapper mapper) {
    this.bookRepository = bookRepository;
    this.mapper = mapper;
  }

  @Transactional(readOnly = true)
  public Page<BookDTO> getBooks(Pageable p) {
    DbContextHolder.setDbType(DbType.REPLICA1);   // <----- set ThreadLocal DataSource lookup key
                                                  // all connection from here will go to REPLICA1
    Page<Book> booksPage = callActionRepo.findAll(p);
    List<BookDTO> pContent = CollectionMapper.map(mapper, callActionsPage.getContent(), BookDTO.class);
    DbContextHolder.clearDbType();               // <----- clear ThreadLocal setting
    return new PageImpl<BookDTO>(pContent, p, callActionsPage.getTotalElements());
  }

  ...//other methods

Now we can control which DataSource will be used and forward requests as we please. Looks good!

...Or does it? First of all, those static method calls to a magical DbContextHolder really stick out. They look like they do not belong the business logic. And they don't. Not only do they not communicate the purpose, but they seem fragile and error-prone (how about forgetting to clean the dbType). And what if an exception is thrown between the setDbType and cleanDbType? We cannot just ignore it. We need to be absolutely sure that we reset the dbType, otherwise Thread returned to the ThreadPool might be in a "broken" state, trying to write to a replica in the next call. So we need this:

  @Transactional(readOnly = true)
  public Page<BookDTO> getBooks(Pageable p) {
    try{
      DbContextHolder.setDbType(DbType.REPLICA1);   // <----- set ThreadLocal DataSource lookup key
                                                    // all connection from here will go to REPLICA1
      Page<Book> booksPage = callActionRepo.findAll(p);
      List<BookDTO> pContent = CollectionMapper.map(mapper, callActionsPage.getContent(), BookDTO.class);
       DbContextHolder.clearDbType();               // <----- clear ThreadLocal setting
    } catch (Exception e){
      throw new RuntimeException(e);
    } finally {
       DbContextHolder.clearDbType();               // <----- make sure ThreadLocal setting is cleared         
    }
    return new PageImpl<BookDTO>(pContent, p, callActionsPage.getTotalElements());
  }

Yikes >_< ! This definitely does not look like something I would like to put into every read only method. Can we do better? Of course! This pattern of "do something at the beginning of a method, then do something at the end" should ring a bell. Aspects to the rescue!

Let's define a neat little annotation with a meaningful name

import java.lang.annotation.ElementType;  
import java.lang.annotation.Retention;  
import java.lang.annotation.RetentionPolicy;  
import java.lang.annotation.Target;

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnlyConnection {

}

And an interceptor for it:

import org.aspectj.lang.ProceedingJoinPoint;  
import org.aspectj.lang.annotation.Around;  
import org.aspectj.lang.annotation.Aspect;  
import org.aspectj.lang.annotation.Pointcut;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.core.Ordered;  
import org.springframework.stereotype.Component;

@Aspect
@Component
public class ReadOnlyConnectionInterceptor implements Ordered {

    private int order;

    @Value("20")
    public void setOrder(int order) {
        this.order = order;
    }

    @Override
    public int getOrder() {
       return order;
    }

    @Pointcut(value="execution(public * *(..))")
    public void anyPublicMethod() { }

    @Around("@annotation(readOnlyConnection)")
    public Object proceed(ProceedingJoinPoint pjp, ReadOnlyConnection readOnlyConnection) throws Throwable {
        try {
            DbContextHolder.setDbType(DbType.REPLICA1);
            Object result = pjp.proceed();
            DbContextHolder.clearDbType();
            return result;
        } finally {
            // restore state
            DbContextHolder.clearDbType();
        }
    }
}

This pointcut will make sure that our public methods annotated with @ReadOnlyConnection will get correct DbType.REPLICA1 DataSource set under the hood and cleared on the method exit (even if exception is thrown).

  @ReadOnlyConnection
  @Transactional(readOnly = true)
  public Page<BookDTO> getBooks(Pageable p) {
    Page<Book> booksPage = callActionRepo.findAll(p);
    List<BookDTO> pContent = CollectionMapper.map(mapper, callActionsPage.getContent(),                             BookDTO.class);
    return new PageImpl<BookDTO>(pContent, p, callActionsPage.getTotalElements());
  }

This looks much better. Business logic does not get cluttered with unnecessary "perpendicular" concepts and annotation name directly communicates that this method is designed to use our read-only data source. The last bit to clarify is the magical @Value("20"). It is used to set the order parameter of our interceptor. The thing is, we need to make sure that the DataSource type is set before the @Transactional annotation kicks in. Otherwise connection will already be bound to the thread at the time our @ReadOnlyConnection gets processed. So basically we need set the order below the order of transactions annotation (20 < 100). You can use this configuration:

<tx:annotation-driven order="100"/>