Java Example to Push Website Access Logs to Cassandra DB

Redis cluster with Docker

Cassandra DB is one of the leading NoSQL database system as it can be very easily configured in scalable cluster in bot on-prime and cloud environment. Most of the leading cloud providers now providing it as managed or serverless service. It is a column based NoSQL and provides very fast write operation.

Cassandra has drivers and a framework for almost every development platform and programming language. You can check officially supported driver pages.

In this post, I will show you how you can use the Datastax Java driver to process website access log files and push data into Cassandra DB.

Problem Statement

You have daily website access logs in a folder generated by the NGINX web server. There is more than one file and each file has thousands of rows. You need to develop a Java program to process these log files and insert records to the Cassandra DB.

This program should be multi-threaded means more than one files should be processed concurrently.

We just need to pass the folder path containing the log files and number of threads as a command-line argument to the program.

Here is the log sample line, each file contains logs in line

157.42.230.57 - - [04/Jun/2021:06:25:08 +0000] "GET /medicine/energic-31/903 HTTP/2.0" 200 64127 "https://www.google.com/" "Mozilla/5.0 (Linux; Android 10; RMX2027) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.210 Mobile Safari/537.36"

Java and Cassandra solution

If you do not have Cassandra setup then you can read here

Creating Cassandra Keyspace and Table

For Storing in Cassandra DB we need to define Keyspace and Table with columns. You can think of Cassandra Keyspace as RDS schema and Table as RDS table with rows but not fixed columns.

Using cqlsh create keyspace named accesslogdb with replication factor 1, you change it to your requirement.

CREATE KEYSPACE accesslogdb
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

For storing logs in Keyspace table we need to define a Cassandra table with following.

CREATE TABLE accesslogdb.access_log
(id uuid PRIMARY KEY, ip varchar, time timestamp, method varchar, url varchar,
status int, length bigint, referral varchar, client varchar );

Now our Kespace and table are ready, you can verify using cqlsh.

cqlsh> select * from accesslogdb.access_log;

 id | client | ip | length | method | referral | status | time | url
----+--------+----+--------+--------+----------+--------+------+-----

(0 rows)

Development of Java Program with Cassandra DB

For connecting the java program to Cassandra DB I will use Datastax Java driver and for IDE I am using IntelliJ Idea and Gradle as build system.

Create one Java and Gradle project in IntelliJ after project creation add following dependacy

dependencies {
    // https://mvnrepository.com/artifact/com.datastax.oss/java-driver-core
    implementation group: 'com.datastax.oss', name: 'java-driver-core', version: '4.11.1'
    // https://mvnrepository.com/artifact/com.datastax.oss/java-driver-query-builder
    implementation group: 'com.datastax.oss', name: 'java-driver-query-builder', version: '4.11.1'
    // https://mvnrepository.com/artifact/com.datastax.oss/java-driver-mapper-runtime
    implementation group: 'com.datastax.oss', name: 'java-driver-mapper-runtime', version: '4.11.1'
    annotationProcessor group: 'com.datastax.oss', name: 'java-driver-mapper-processor', version: '4.11.1'
    // https://mvnrepository.com/artifact/commons-io/commons-io
    implementation group: 'commons-io', name: 'commons-io', version: '2.9.0'
    
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
}

I added Datastax Java lib java-driver-core, java-driver-query-builder and java-driver-mapper-runtime. You refer datastax/java-driver: DataStax Java Driver for Apache Cassandra (github.com) for details of these libraries.

Core driver: The core module handles cluster connectivity and request execution.

Query builder: The query builder is a utility to generate CQL queries programmatically.

Mapper: The mapper generates the boilerplate to execute queries and convert the results into application-level objects.

Now we need to define one Entity class for the Cassandra DB table. I added the id field of type UUID for the partition key in AccessLog entity class.

@Entity
public class AccessLog {
    @PartitionKey
    private UUID id;
    private String ip;
    private Instant time;
    private String method;
    private String url;
    private int status;
    private long length;
    private String referral;
    private String client;

    public AccessLog() {
        this.id = UUID.randomUUID();
    }
    public UUID getId() {
        return id;
    }
    public void setId(UUID id) {
        this.id = id;
    }
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public Instant getTime() {
        return time;
    }
    public void setTime(Instant time) {
        this.time = time;
    }
    public String getMethod() {
        return method;
    }
    public void setMethod(String method) {
        this.method = method;
    }
    public String getUrl() {
        return url;
    }
    public void setUrl(String url) {
        this.url = url;
    }
    public int getStatus() {
        return status;
    }
    public void setStatus(int status) {
        this.status = status;
    }
    public long getLength() {
        return length;
    }
    public void setLength(long length) {
        this.length = length;
    }
    public String getReferral() {
        return referral;
    }
    public void setReferral(String referral) {
        this.referral = referral;
    }
    public String getClient() {
        return client;
    }
    public void setClient(String client) {
        this.client = client;
    }
}

Now we need to define one Dao interface for operation with Cassandra. @Dao defines some set of Cassandra query methods.

@Dao
public interface AccessLogDao {
    @Select
    AccessLog findById(UUID id);

    @Insert
    void save(AccessLog log);

    @Delete
    void delete(AccessLog log);
}

Now we need to define a Mapper interface using @Mapper interface

@Mapper
public interface AccessLogMapper {
    @DaoFactory
    AccessLogDao accessLogDao(@DaoKeyspace CqlIdentifier keyspace);
}

The mapper annotation processor will generate an implementation and a builder that allows you to create an instance from a CqlSession.

By default, the builder’s name is the name of the interface with the suffix “Builder”, and it resides in the same package. You can also use a custom name with builderName().
The interface should define one or more DaoFactory methods.

Now I am going to implement one class that will implement a method to put AccessLog object into Cassandra DB.

public interface AccessLogService extends Closeable {
    void save(AccessLog log);
}
public class AccessLogCassandraServiceImpl implements AccessLogService {
    private final CqlSession session = CqlSession.builder().build();
    private final AccessLogMapper accessLogMapper = new AccessLogMapperBuilder(session).build();
    private final AccessLogDao dao = accessLogMapper.accessLogDao(CqlIdentifier.fromCql("accesslogdb"));
    public AccessLogCassandraServiceImpl() {
    }
    @Override
    public void save(AccessLog log){
        this.dao.save(log);
    }
    @Override
    public void close() throws IOException {
        this.session.close();
    }
}

The above class will be used to create a Cassandra session and Dao object, which will be used to perform DB operations.

CqlSession.builder().build() will fetch all information from application.conf file, which should be on the program classpath. In this project, I put this file in the resources folder.

application.conf file has the following info, you can provide additional required info in this file. contact-points has all nodes IP addresses.

datastax-java-driver {
  basic {
    contact-points = [ "192.168.1.22:9042", "192.168.1.23:9042" ]
    load-balancing-policy.local-datacenter = dc1
  }
}

You can also provide these information code directly.

CqlSession session = CqlSession.builder()
    .addContactPoint(new InetSocketAddress("192.168.1.22", 9042))
    .addContactPoint(new InetSocketAddress("192.168.1.23", 9042))
    .withLocalDatacenter("dc1")
    .build();

Log Processor class to parse files using concurrent task and store in Cassandra DB.

public interface AccessLogProcessorService {
    void process();
}
public class FileAccessLogProcessorServiceImpl implements AccessLogProcessorService {
    private final ExecutorService executorService;
    private final List<Path> files;
    private DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("dd/MMM/yyyy:HH:mm:ss");

    public FileAccessLogProcessorServiceImpl( List<Path> files, int noOfThreads) {
        this.files = files;
        this.executorService = Executors.newFixedThreadPool(noOfThreads);
    }

    @Override
    public void process() {

        Collection<Future<?>> futures = new LinkedList<>();
        for (Path file : this.files) {
            ProcessTask processTask = new ProcessTask(file, new AccessLogCassandraServiceImpl());
            futures.add(this.executorService.submit(processTask));
        }
//Wait to complete all concurrent task.
        int i =0;
        for (Future<?> future : futures) {
            try {
                future.get();
                System.out.println("file complete "+ i++);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
        this.executorService.shutdownNow();
    }

    class ProcessTask implements Callable<Boolean> {
        final Path file;
        final AccessLogService accessLogService;

        ProcessTask(final Path file, final AccessLogService accessLogService) {
            this.file = file;
            this.accessLogService = accessLogService;
        }

        @Override
        public Boolean call() throws Exception {
            File file = this.file.toFile();
            LineIterator it = FileUtils.lineIterator(file, "UTF-8");
            try {
                while (it.hasNext()) {
                    String line = it.nextLine();
                    AccessLog log = new AccessLog();
                    String[] parts = line.split(" - - \\[");
                    if (parts.length == 2) {
                        log.setIp(parts[0]);
                    } else {
                        this.logError(line);
                        continue;
                    }

                    parts = parts[1].split(" \\+0000\\] \"");
                    if (parts.length == 2) {
                        LocalDateTime ldt = LocalDateTime.parse(parts[0], dateTimeFormatter);
                        log.setTime(ldt.toInstant(ZoneOffset.UTC));
                    } else {
                        this.logError(line);
                        continue;
                    }
                    // Parse method
                    parts = parts[1].split(" \\/");
                    if (parts.length == 2) {
                        log.setMethod(parts[0]);
                    } else {
                        this.logError(line);
                        continue;
                    }
                    parts = parts[1].split("\"" );
                    if (parts.length == 5) {
                        log.setUrl("/"+parts[0]);
                        log.setReferral(parts[2]);
                        log.setClient(parts[4]);
                    } else {
                        this.logError(line);
                        continue;
                    }
                    parts = parts[1].trim().split(" ");
                    if(parts.length == 2) {
                        log.setStatus(Integer.parseInt(parts[0]));
                        log.setLength(Long.parseLong(parts[1]));
                    }else{
                        this.logError(line);
                        continue;
                    }
                    this.accessLogService.save(log);
                }
            } finally {
                it.close();
                this.accessLogService.close();
            }
            return true;
        }
        private void logError(String line) {
            System.out.println("Unable to parse line");
            System.out.println(line);
        }
    }
}

In the above class, I am making one executable task for each file and one Cassandra service instance which has a Cassandra session to save long into DB.

Class to get file List path

public class FileAccessLogSourceServiceImpl implements AccessLogSourceService {
    private final String logFolder;

    public FileAccessLogSourceServiceImpl(final String folder) {
        this.logFolder = folder;
    }

    @CheckForNull
    @Override
    public List<Path> getFiles() throws IOException {
        if (Files.exists(Paths.get(this.logFolder))) {
            return Files.list(Paths.get(this.logFolder)).collect(Collectors.toList());
        } else {
            return null;
        }
    }
}

Application Entry main class

public class AccessLogClientApp {

    public static void main(String[] args) throws IOException {
        if(args.length == 2) {
            String folderPath = args[0];
            int noOfThreads = Integer.parseInt(args[1]);
            AccessLogSourceService accessLogSourceService = new FileAccessLogSourceServiceImpl(folderPath);

            AccessLogProcessorService accessLogParserService =
                    new FileAccessLogProcessorServiceImpl(accessLogSourceService.getFiles(), noOfThreads);
            accessLogParserService.process();
        }
    }
}

You can find a complete working example on GitHub. I also keep one long fine inside the resources folder. cassandra-example/accesslog-client at main · sheelprabhakar/cassandra-example (github.com)

In some other post, I will write about querying Cassandra DB.

,

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.