Concurrency issues can be notoriously difficult to diagnose, often being the cause of classically difficult problems such as heisenbugs. Attributes of concurrency issues such as intermittency and the inability to reproduce all contribute to a difficulty in debugging. Furthermore, disparity between late stage environments such as production where you have large amounts of load competing for resources and early stage environments such as your development environment where you may only have one small virtual host’s worth of load leads to issues that only appear in production, making reproducibility a major problem when diagnosing your issue. Moreover, the lack of a stack trace when such issues occur (i.e. you probably just have a logical error in your code where the wrong value is returned) makes life even tougher. For example, a concurrency issue in code that queries a different DB depending on the geographic location of the customer might inadvertently end up querying the wrong DB resulting in incorrect results every once in a while. This happened to me, personally, while working on Expedia’s Payment Service team. As my old engineering professor, MPBL, said “Intermittency is the bane of [an engineer’s] existence.”

In addition to detection, testing the fix of a concurrency bug can be equally, if not more, challenging. The fact that the concurrency issue might only happen in prod or a post-development stage after the dev environment reduces the confidence in any potential fixes when pushing. Often, we don’t even test for them in earlier stage environments because we don’t just don’t have confidence they will occur in early stage environments. Instead, we rely exclusively on existing regression tests, and just verify that we didn’t break existing functionality. “Well, it’s only happening in prod and our fix hasn’t broken anything in QA so let’s just release it. I’m pretty sure it will fix it.” Such statements are too frequently said for such “fixes”. I know I’ve been guilty of saying things like this before. :-/ Of course, such statements are often said more than once, and the investigation involving the concurrency issue often devolves into a proverbial shit show.

To help limit the scope of this post, I want to state outright that I won’t go over steps to narrow down possible concurrency issues. There are solid sources of information out there already for that, and I want to be as brief as possible. Instead, I want to focus on template code that can allow you to use TDD for possible concurrency issues that you investigate as a hypothesis as you drill down on various classes in your investigation.

With all that being said, you may be surprised or skeptical to hear that, to a surprisingly significant extent, concurrency issues can be unit tested. Below is an example of a unit test template for Java that I have currently used to employ TDD for two concurrency issues.

public class ThreadSafetyTest {
  @Test
  public void isThreadSafe() throws InterruptedException {
    // Step 1: Determine the number of parallel child threads to spawn.
    int numProcs = Runtime.getRuntime().availableProcessors();
    int numThreads = numProcs * 10;

    // Step 2: Initialize the instance to be tested that will be shared amongst the spawned threads.
    // TODO: Insert code here.

    // Step 3: Create a list of tasks that will perform the unsafe operation and assert that it
    // performs as expected (i.e. deserialize() produces a JSON request with the correct date.
    // Note that each child thread should verify something unique, so each thread here verifies a
    // unique date, based on `threadOffset`.
    List<Runnable> tasks = new ArrayList<>(numThreads);
    for (int threadOffset = 0; threadOffset < numThreads; ++threadOffset) {
      // Step 3.1: Setup any expectations for the spawned test thread here. These expectations will
      // be used in Sec. 3.2.2 below.
      // TODO: Insert code here.

      // Step 3.2: Setup the spawned test thread
      Runnable task = new Runnable() {
        @Override
        public void run() {
          // Step 3.2.1: Do the actual work that produces incorrect result.
          // TODO: Insert code here.
          // Step 3.2.2: Compare correct expectation vs. actual result.
          // TODO: Insert code here.
        }
      };

      tasks.add(task);
    }

    // Step 4: Use an AtomicReference to capture the first exception thrown by a child thread.
    Optional<Throwable> opEmpty = Optional.empty();
    /*
     * Use AtomicReference as a means of capturing the first thrown exception, since a spawned
     * thread can't "throw" an exception to the parent thread.
     */
    final AtomicReference<Optional<Throwable>> firstThrownException =
        new AtomicReference<>(opEmpty);

    // Step 5: Construct a new ThreadPoolExecutor that will execute all tasks in parallel on different threads.
    // The new ThreadPoolExecutor will exploit the `afterExecute()` method that gets called with the thread's
    // thrown exception.
    /*
     * Use new ThreadPoolExecutor instead of Executors.newFixedThreadPool() so that I can override
     * afterExecute() for the purposes of throwing an exception from the test thread if a child thread
     * fails. Thus, a failed thread will cause the test, itself, to fail. Trick from StackOverflow
     * at:
     * http://tinyurl.com/gluof74
     */
    ExecutorService execSvc = new ThreadPoolExecutor(numThreads, numThreads,
        0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) {
      @Override
      public void afterExecute(Runnable task, Throwable failureCause) {
        if (failureCause == null) {
          // The Runnable completed successfully.
          return;
        }
        // only sets the first exception because it will only be empty on the first call.
        firstThrownException.compareAndSet(Optional.<Throwable> empty(),
        Optional.of(failureCause));
      }
    };

    // Step 6: Execute all the tasks in parallel, verifying that no exception was thrown from any child test
    // threads, which will occur when a date assertion fails.
    for (Runnable task : tasks) {
      execSvc.execute(task);
    }
    execSvc.shutdown();
    execSvc.awaitTermination(1, TimeUnit.HOURS);

    assertEquals(Optional.empty(), firstThrownException.get());
  }
}

Using this template, you can test drive suspect code that you don’t believe to be thread-safe. Of course, it’s not 100% safe. The test may still pass with concurrency issues. That being, said, assuming that your code is passes for single threaded test cases, and this test passes when you set numThreads to 1, or some low number, and then fails for larger values of numThreads, then you can have a high degree of confidence that you have a concurrency issue, which this test is exposing. Again I would like to reiterate that this template has worked for me two for two times in the past where I have employed it, and has greatly increased my efficiency while fixing concurrency issues.

Let’s use this template in a real example I recently had to deal with. The problem was that during some testing, when calling an in-development web service, a team member noticed that dates in our JSON requests to our new service were being set to null. The domain objects containing the dates were fine. So our hypothesis was that there was a concurrency issue in the code that serialized dates into JSON. Consider the following stripped down version of that class – a Spring bean singleton that wrapped a GSON instance to deserialize dates returned by our web service:

@Component
public class Foo {
  private static final String SHORT_FORMAT = "yyyy-MM-dd HH:mm:ss";

  private static final DateFormat DATE_FORMAT = new SimpleDateFormat(SHORT_FORMAT);

  private Gson gson;

  @PostConstruct
  public void init() {
    if (this.gson == null) {
      this.gson = new GsonBuilder()
          .registerTypeAdapter(Date.class, new DateDeserializer())
          .setDateFormat(SHORT_FORMAT).create();
    }
  }

  private static class DateDeserializer implements JsonDeserializer<Date> {
    @Override
    public Date deserialize(JsonElement jsonElement, Type typeOfT,
        JsonDeserializationContext context) throws JsonParseException {
        String dateAsString = jsonElement.getAsString();
      try {
        return DATE_FORMAT.parse(dateAsString);
      } catch (Exception e) {
        return null;
      }
    }
  }

  public FooRequest deserialize(String json) {
    FooRequest request = this.gson.fromJson(json, FooRequest.class);
    return request;
  }
}

Note that the corresponding code for FooRequest was:

public class FooRequest {

  @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
  private Date searchDate;

  public Date getSearchDate() {
    return this.searchDate;
  }

  public void setSearchDate(Date searchDate) {
    this.searchDate = searchDate;
  }
}

The above is a simplified version of code I recently had to deal with, while not so simple that it eliminates all possible causes of the concurrency issue.What this code does is initialize a non-final GSON instance via the init() method. It also checks whether the gson field is null in a non-synchronized part of the code, and initializes it. After the initialization, clients may call the deserialize() method to change a JSON string into a FooRequest instance via the DATE_FORMAT static constant. A simplified version of the test class based off the above template is as follows:


public class FooThreadSafetyTest {
  private static final String BASE_REQUEST = "{" +
      "\"searchDate\":\"2015-11-23 12:34:56\"" +
      "}";

  private static final Pattern SEARCH_DATE_REGEX = Pattern.compile("\"searchDate\":\"([^\"]+)\"");

  private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

  @Test
  public void isThreadSafe() throws ParseException, InterruptedException {
    // Step 1: Determine the number of parallel child threads to spawn.
    int numProcs = Runtime.getRuntime().availableProcessors();
    int numThreads = numProcs * 10;

    // Step 2: Initialize the instance to be tested that will be shared amongst the spawned threads.
    final Foo foo = new Foo();
    foo.init(); 

    // Step 3: Create a list of tasks that will perform the unsafe operation and assert that it
    // performs as expected (i.e. deserialize() produces a JSON request with the correct date.
    // Note that each child thread should verify something unique, so each thread here verifies a
    // unique date, based on `threadOffset`.
    List<Runnable> tasks = new ArrayList<>(numThreads);
    for (int threadOffset = 0; threadOffset < numThreads; ++threadOffset) {
      StringBuilder rqBuilder = new StringBuilder(BASE_REQUEST);

      final Date expSearchDate = updateDate(rqBuilder, SEARCH_DATE_REGEX, threadOffset);

      final String rq = rqBuilder.toString();

      Runnable task = new Runnable() {
        @Override
        public void run() {
          FooRequest fooRq = foo.deserialize(rq);
          Date actSearchDate = fooRq.getSearchDate();
          assertEquals(expSearchDate, actSearchDate);
        }
      };

      tasks.add(task);
    }

    // Step 4: Use an AtomicReference to capture the first exception thrown by a child thread.
    Optional<Throwable> opEmpty = Optional.empty();
    /*
     * Use AtomicReference as a means of capturing the first thrown exception, since a spawned
     * thread can't "throw" an exception to the parent thread.
     */
    final AtomicReference<Optional<Throwable>> firstThrownException =
            new AtomicReference<>(opEmpty);

    // Step 5: Construct a new ThreadPoolExecutor that will execute all tasks in parallel on different threads.
    // The new ThreadPoolExecutor will exploit the `afterExecute()` method that gets called with the thread's
    // thrown exception.
    /*
     * Use new ThreadPoolExecutor instead of Executors.newFixedThreadPool() so that I can override
     * execute() for the purposes of throwing an exception from the test thread if a child thread
     * fails. Thus, a failed thread will cause the test, itself, to fail. Trick from StackOverflow
     * at:
     * http://tinyurl.com/gluof74
     */
    ExecutorService execSvc = new ThreadPoolExecutor(numThreads, numThreads,
        0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) {
      @Override
      public void afterExecute(Runnable task, Throwable failureCause) {
        if (failureCause == null) {
          // The Runnable completed successfully.
          return;
        }
        // only sets the first exception because it will only be empty on the first call.
        firstThrownException.compareAndSet(Optional.<Throwable> empty(),
            Optional.of(failureCause));
        }
      };

      // Step 6: Execute all the tasks in parallel, verifying that no exception was thrown from any child test
      // threads, which will occur when an date assertion fails.
      for (Runnable task : tasks) {
        execSvc.execute(task);
      }
      execSvc.shutdown();
      execSvc.awaitTermination(1, TimeUnit.HOURS);

      assertEquals(Optional.empty(), firstThrownException.get());
  }

   /**
    * Replaces the date matched in {@code rqBuilder} using {@code dateRegex} with a new date. The new date
    * is the matched date plus the {@code threadOffset}.
    */
  private Date updateDate(StringBuilder rqBuilder, Pattern dateRegex, int threadOffset)
        throws ParseException {
    Matcher searchDateMatcher = dateRegex.matcher(rqBuilder);
    boolean foundSearchDate = searchDateMatcher.find();
    assertTrue(foundSearchDate);
    int dateStartIndex = searchDateMatcher.start(1);
    int dateEndIndex = searchDateMatcher.end(1);

    String strDate = BASE_REQUEST.substring(dateStartIndex, dateEndIndex);
    Date date = DATE_FORMAT.parse(strDate);
    Calendar calendar = Calendar.getInstance();
    calendar.setTime(date);
    calendar.add(Calendar.DATE, threadOffset);

    Date modifiedDate = calendar.getTime();
    String strModifiedDate = DATE_FORMAT.format(modifiedDate);
    rqBuilder.replace(dateStartIndex, dateEndIndex, strModifiedDate);

    return modifiedDate;
  }
}

Basically what this unit test is doing is kicking off a large number of test threads from the test (ten times the number of processors on the host). Each thread deserializes a unique JSON string request into a request object. Each request object’s actual date is compared against the expected date. The deserializations and comparisons happen in parallel – one deserialization and subsequent comparison per thread. If any one of the dates is not equal to its expected value, that thread will throw an exception, which will be picked up by execSvc‘s afterExecute() method. If present, the first exception picked up by the afterExecute() method then fails the overall test by setting the AtomicReference to firstThrownException.

On my machine, when I run this test against the previously defined classes Foo and FooRequest, I get an AssertionError from FooThreadSafetyTest. Basically, an Optional instance was expected but an actual Optional instance containing an AssertionError was found. This outer assertion error happened in the test code at:

assertEquals(Optional.empty(), firstThrownException.get());

The inner assertion error happens in the anonymous Runnable‘s run() method at:

assertEquals(expSearchDate, actSearchDate);

For this case, expSearchDate was Thu Feb 09 12:34:56 PST 2016 but actSearchDate was erroneously null, thus failing the test.

Based on your knowledge of the SimpleDateFormat class, you may arrive at the hypothesis that SimpleDateFormat is not as thread safe as you thought it was. As such, you may decide to wrap it in a ThreadLocal instance. Your new code may be as follows:

@Component
public class Foo {
  private static final String SHORT_FORMAT = "yyyy-MM-dd HH:mm:ss";

  private static final ThreadLocal<DateFormat> DATE_FORMATS = new ThreadLocal<DateFormat>() {
    @Override
    protected DateFormat initialValue() {
      return new SimpleDateFormat(SHORT_FORMAT);
    }
  };

  private Gson gson;

  @PostConstruct
  public void init() {
    if (this.gson == null) {
      this.gson = new GsonBuilder()
        .registerTypeAdapter(Date.class, new DateDeserializer())
        .setDateFormat(SHORT_FORMAT).create();
    }
  }

  private static class DateDeserializer implements JsonDeserializer<Date> {
    @Override
    public Date deserialize(JsonElement jsonElement, Type typeOfT,
      JsonDeserializationContext context) throws JsonParseException {
      String dateAsString = jsonElement.getAsString();
      try {
        DateFormat dateFormat = DATE_FORMATS.get();
        return dateFormat.parse(dateAsString);
      } catch (Exception e) {
        return null;
      }
    }
  }

  public FooRequest deserialize(String json) {
    FooRequest request = this.gson.fromJson(json, FooRequest.class);
    return request;
  }
}

If you execute the above code, you’ll find that the hypothesis proves correct and we may now have confidence that this block of code did contain a race condition in the manner we predicted but no longer does. Thus, the concurrency issue was corrected.

For fellow fans of Michael Feather’s book, Working Effectively with Legacy Code, you can argue that this isn’t truly a unit test, because tests such as this one “don’t run fast”. Nonetheless, in my recent experience with this test, it executes slowly, but at an imperceptible level. On my machines, it took less than 1ms.

Note also the reliability of this test template. We may have false positives from the test passing despite a present race condition. That being said, I still haven’t seen false positives so far. Conversely, however, assuming the test template passes for the single thread version (i.e. where numThreads is 1), the test should never fail unless a concurrency issue is present (notwithstanding errors like OutOfMemoryError, StackOverflowError, etc.) Thus, false positives are rare and false negatives are even rarer.

If you have actually gotten this far in the post, thanks for reading. I hope this post and template code prove useful to you in the future.

Leave a reply

required