The Amazon AWS Java SQS Client is *Not* Thread-safe

I recently added long polling to an Amazon SQS project that processes thousands of messages a minute. The idea was simple:

  • Spawn N number of threads (let’s say 60) that repeatedly check an SQS queue using long polling
  • Each thread waits for at most one message for maximum concurrency, restarting if no message is found
  • Each time a message is found, the thread processes it and ACK’s via deleteMessage() (failure to do so causes the message to go back on the queue after the visibility timer is reached)

For convenience, I used the Java Concurrency API ScheduledExecutorService.scheduleWithFixedDelay() method, setting each thread with 1 millisecond delay, although I could have accomplished the same thing using the Thread class and an infinite while() loop. With short polling, this kind of structure would tend thrash, but with long polling, each thread is just waiting when there are no messages available. Note: For whatever reason, Java does not allow a 0 millisecond delay for this method, so 1 millisecond it is!

Noticing the Problem
When I started testing my new version based on long polling, I noticed something quite odd. While the messages all seem to be processed quickly (1-10 milliseconds) and there were no errors in the logs, the AWS Console showed 50+ messages in-flight. Based on the number of messages being processed a second and the time it was taking to process them, the in-flight counter should have been only 3-4 messages at any given time but it consistently stayed high.

Isolating the Issue
I knew it had something to do with long polling, since previously with short polling I never saw that many messages consistently in flight, but it took a long time to isolate the bug. I discovered that in certain circumstances the Amazon AWS Java SQS Client is not thread-safe. Apparently, the deleteMessage() call can block if too many other threads are performing long polling. For example, if you set the long polling to 10 seconds, the deleteMessage() can block for 10 seconds. If you set long polling to 20 seconds, the deleteMessage() can block for 20 seconds, and so on. Below is a sample class which reproduces the issue. You may have to run it multiple times and/or increase the number of polling threads, but you should see intermittent delays in deleting messages between Lines 25 and 27.

package net.selikoff.aws;

import java.util.concurrent.*;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.*;
import com.amazonaws.services.sqs.model.*;

public class SQSThreadSafeIssue {
	private final String queueName;
	private final AmazonSQS sqsClient;
	private final int numberOfThreads;
	
	public SQSThreadSafeIssue(Regions region, String queueName, int numberOfThreads) {
		super();
		this.queueName = queueName;
		this.sqsClient = AmazonSQSClientBuilder.standard().withRegion(region).build(); // Relies on locally available AWS creds
		this.numberOfThreads = numberOfThreads;
	}
	
	private void readAndProcessMessages(ReceiveMessageRequest receiveMessageRequest) {
		final ReceiveMessageResult result = sqsClient.receiveMessage(receiveMessageRequest);
		if(result!=null && result.getMessages()!=null && result.getMessages().size()>0) {
			result.getMessages().forEach(m -> {
				final long start = System.currentTimeMillis();
				System.out.println("1: Message read from queue");
				sqsClient.deleteMessage(new DeleteMessageRequest(queueName, m.getReceiptHandle()));
				System.out.println("2: Message deleted from queue in "+(System.currentTimeMillis()-start)+" milliseconds");
			});
		}
	}
	
	private void createMessages(int count) {
		for(int i=0; i<count; i++) {
			sqsClient.sendMessage(queueName, "test "+System.currentTimeMillis());
		}
	}
	
	public void produceThreadSafeProblem(int numberOfMessagesToAdd) {
		// Start up and add some messages to the queue
		createMessages(numberOfMessagesToAdd);
		
		// Create thread executor service
		final ScheduledExecutorService queueManagerService = Executors.newScheduledThreadPool(numberOfThreads);
		
		// Create reusable request object with 20 second long polling
		final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
		receiveMessageRequest.setQueueUrl(queueName);
		receiveMessageRequest.setMaxNumberOfMessages(1);
		receiveMessageRequest.setWaitTimeSeconds(20);
		
		// Schedule some thread processors
		for(int i=0; i<numberOfThreads; i++) {
			queueManagerService.scheduleWithFixedDelay(() -> readAndProcessMessages(receiveMessageRequest),0,1,TimeUnit.MILLISECONDS);
		}
	}
	
	public static void main(String[] args) {
		final SQSThreadSafeIssue issue = new SQSThreadSafeIssue(Regions.YOUR_REGION_HERE,"YOUR_QUEUE_NAME_HERE",60);
		issue.produceThreadSafeProblem(5);
	}
}

And below is a sample output of this, showing that each message took 20 seconds (the long polling time) to be deleted.

1: Message read from queue
1: Message read from queue
1: Message read from queue
1: Message read from queue
1: Message read from queue
2: Message deleted from queue in 20059 milliseconds
2: Message deleted from queue in 20098 milliseconds
2: Message deleted from queue in 20024 milliseconds
2: Message deleted from queue in 20035 milliseconds
2: Message deleted from queue in 20038 milliseconds

Note: The SQSThreadSafeIssue class requires Java 8 or higher along with the following libraries to compile and run. It uses the latest version of the Amazon AWS Java SDK 1.11.278 available from AWS (although not in mvnrepository.com yet):

Understanding the Problem
Now that we see messages are taking 20 seconds (the long polling time) to be deleted, the large number of messages in-flight makes total sense. If the messages are taking 20 seconds to be deleted, what we are seeing is the total number of in-flight messages over the last 20 second window waiting to be deleted, which is not a ‘true measure’ of in-flight messages actually being processed. The more threads you add, say 100-200, the more easily the issue becomes to reproduce. What’s especially interesting is that the polling threads don’t seem to be blocking each other. For example, if 50 messages come in at once and there are 100 threads available, then all 50 messages get read immediately, while not a single deleteMessage() is allowed through.

So where does the Problem lie? That’s easy. Despite being advertised as @ThreadSafe in the API documentation, the AmazonSQS client is certainly not thread-safe and appears to have a maximum number of connections available. While I imagine this doesn’t come up often when using the default short-polling, it is not difficult to reproduce this problem when long-polling is enabled in a multi-threaded environment.

Finding a Solution
The solution? Oh, that’s trivial. So trivial, I was tempted to leave as an exercise to the reader! But since I’m hoping AWS developers will read article and fully understand the bug, so they can apply a patch, here goes….

You just need to create two AmazonSQS instances in the constructor of SQSThreadSafeIssue, one for reading (Line 21) and one for deleting (Line 26). Once you have two distinct clients, the deletes all happen within a few milliseconds. Once applied to the original project I was working on, the number of in-flight messages dropped significantly to a number that was far more expected.

Although this work-around fix is easy to apply, it should not be necessary, aka you should be able to reuse the same client. In fact, AWS documentation often encourages you to do so. The fact that the Amazon SQS client is not thread-safe when long polling is enabled is a very serious issue, one I’m hoping AWS will resolve in a timely manner!

getting started with gradle in eclipse on a mac

I’ve used Gradle before although not a ton. For example, I compared eGradle and Buildship here. It’s been a while though. Which means my laptop setup has changed enough for it to be an adventure. (and not everything I needed to know was in my memory).

Eclipse Oxygen comes with Gradle installed so I thought this would be easy. And if I had never installed Java 9 on my computer, that would have been the case. After all, you just need to run: File > New > Gradle Project

The original problem

I tried that and got this error:

org.gradle.tooling.GradleConnectionException: Could not fetch model of type ‘BuildEnvironment’ using Gradle distribution ‘https://services.gradle.org/distributions/gradle-3.5-bin.zip’.

Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make protected java.lang.Package[] java.lang.ClassLoader.getPackages() accessible: module java.base does not “opens java.lang” to unnamed module @2e59aa

I got a similar error when trying to import an existing Gradle project. I have Gradle installed on the command line and it works fine.

What didn’t work 

I want to be using Java 8. I tried the following things that didn’t help:

  • Change eclipse.ini to use Java 8
  • Remove all Java 9 JDKs from the Eclipse workspace preferences.
  • Adding a system JAVA_HOME environment variable pointing to Java 8
  • This workaround editing the build.gradle file
  • A new workspace (my teammate teases me that my workspace was around when Obama got elected – I often do an upgrade in place)
  • Gradle code for Eclipse
  • Checked my versions against what is known to work. I do have Buildship 2.2+ (2.2.1) and Gradle 4.3+ (gradle 4.5).
  • Re-installing Eclipse and not installing/updating any plugins

Updating the plugin

Then I updated the Buildship plugin in Eclipse. Since I had done this in the past, I had to:

  1. Delete the Buildship update site
  2. Re-add the current update site. (Thanks to this post for the suggestion)
  3. Update plugin
  4. Restart Eclipse

That “helped” in that I got a different error:

Caused by: org.gradle.internal.exceptions.LocationAwareException: Unable to make protected void java.net.URLClassLoader.addURL(java.net.URL) accessible: module java.base does not “opens java.net” to unnamed module @6740e13b

This time it created the project on disk and merely didn’t import it. Importing had the same problem though. There is at least one known error with Java 9 and Buildship. And I’m getting a Java 9 error so clearly, I still have something pointing to Java 9 on system.

What actually worked

Running java -version showed I had Java 9 as my system default. This is because Mac defaults to the latest version. I didn’t have a problem earlier because Eclipse itself was set to use Java 8. (except when I’m playing with Java 9.)

I actually had to disable Java 9 on the Mac so it would recognize Java 8 as the default. This post shares how:

  1. cd /Library/Java/JavaVirtualMachines
  2. cd into my Java 9 directory
  3. cd contents
  4. mv Info.plist Info.plist.disabled
  5. Restart Eclipse

And a quick review of Gradle/Buildship

  1. You can intersperse running command line gradle commands and using Buildship. Since everything is stored in the gradle directories, they remember state from each other.
  2. To run “gradle clean” or “gradle build” in Buildship the first time:
    1. Go to the Gradle Tasks view
    2. Expand project
    3. Expand build
    4. Right click clean (or build)
    5. The Gradle Executions view shows what is still running
    6. The Console view shows the command line output
  3. To run “gradle clean” or “gradle build” in BuildShip the second time:
    1. Buildship automatically creates a run configuration for any builds you run. They have clear names (ex: atlanta-tourism – clean”) so you can easily find the right one. You can sticky common ones to the top since they are normal Eclipse run configurations
  4. To delete the whole gradle cache: rm /Users/xxx/.gradle/caches
  5. To delete a specific artifact rm /Users/nyjeanne/.gradle/caches/modules-2/files-2.1/… (I forgot to include a dependency and seeing everything re-downloaded helped)

 

 

 

JavaOne – Optional – the mother of all bikesheds

“Optional – the mother of all bikesheds”

Speaker: Stuart Marks
#OptionalBikeshed

The deck is available online

For more blog posts from JavaOne, see the table of contents


Showed early version of “optional” (stream returning null)

Review of basics

  • Two states – present (contains non null reference) or absent
  • Don’t say “null optional”. Say “absent” or “empty”
  • Optionals are immutable
  • equals() and hashCode() work as one would expect – useful for JUnit because can call assertEquals()

New in Java 9

  • stream() – stream with 0 or 1 elements
  • ifPresentOrElse(Consumer)
  • or(Supplier)

Good code and Best practices

  • Never set an Optional variable to null. It should be Optional.empty() as default.
  • If method returns Optional, never return a null either.
  • Intended for return types where might or might not have a value to return.
  • Don’t call get() as could be empty optional
  • Use isPresent() and get() as last resort
  • Using .map().orElse() chained after stream pipeline gives fluent API calls
  • Calling filter() returns original value only if non-empty optional that matches. else returns empty. Using filter() and orElseThrow() is good for validation
  • [and more see the deck]

My take: Amazed there is so much to say about Optional! Excellent set of best practices. And excellent point on not calling an optional “null.” I needed to leave early as my session is immediately after this one. And I really wanted to know “what’s with the bikeshed” which is covered last.  It’s a nice collection of quotes. So I found the deck and best practices online. There’s also a youtube video linked so you can watch at home.